Building Neurostash - I

Farhan KhojaFarhan Khoja
5 min read

The Problem That Started It All

We've all been there—drowning in documents scattered across emails, cloud drives, and local folders. Important information buried in PDFs, critical insights lost in meeting notes, and that one crucial document that you know exists but can't find when you need it most.

After experiencing this pain point repeatedly in my development work, I decided to build NeuroStash: an open-source knowledge management system designed to ingest, process, and intelligently extract information from document which can be self-hosted. But this isn't just another file storage solution—it's a production-ready system built with scalability, security, and developer experience in mind.

Project structure

NeuroStash
|- app/ # HTTP Endpoints and request/respones handler
|- core/ # Configuration and Database connection
|- dao/ # Data Access Objects - Database Operations
|- consumer/ # Asynchronous message processing
|- processor/ # Document processing pipeline
|- migrations # Database schema migrations

Database Schema Design: Multi-tenancy and Security First

  • The core of NeuroStash lies in its carefully designed database schema that balances performance, security, and flexibility

Multi-tenant User Management

class UserClient(Base, TimestampMixin):
    __tablename__ = "user_clients"

    id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
    email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
    role: Mapped[ClientRoleEnum] = mapped_column(
        SQLEnum(ClientRoleEnum, name="client_roles", create_type=False), 
        nullable=False
    )

    api_keys: Mapped[List["ApiKey"]] = relationship(
        back_populates="user_client", cascade="all, delete-orphan"
    )
    documents: Mapped[List["DocumentRegistry"]] = relationship(
        back_populates="user_client"
    )

Document Lifecycle Management

class DocumentRegistry(Base, TimestampMixin):
    __tablename__ = "documents_registry"

    user_id: Mapped[int] = mapped_column(ForeignKey("user_clients.id"))
    file_name: Mapped[str] = mapped_column(String(100), nullable=False)
    object_key: Mapped[str] = mapped_column(String(150), nullable=False)

    lock_status: Mapped[bool] = mapped_column(Boolean, nullable=False)
    op_status: Mapped[OperationStatusEnum] = mapped_column(
        SQLEnum(OperationStatusEnum), 
        nullable=False,
        server_default=OperationStatusEnum.PENDING.value
    )

    __table_args__ = (
        UniqueConstraint("user_id", "file_name", name="idx_unique_filename"),
        Index("idx_file_registry_user_id", "user_id", "lock_status", "op_status"),
    )
  • The lock_status field prevents concurrent processing of the same document, while op_status provides clear visibility into processing state.

  • The compound index on (user_id, lock_status, op_status) enables efficient queries for finding document ready for processing

Encryption Key Management

class EncryptionKey(Base, TimestampMixin):
    __tablename__ = "encryption_keys"

    symmetric_key: Mapped[bytes] = mapped_column(LargeBinary, nullable=False)
    is_active: Mapped[bool] = mapped_column(Boolean, server_default=text("false"))
    expired_at: Mapped[Optional[datetime]] = mapped_column(TIMESTAMP(timezone=True))

    __table_args__ = (
        Index("idx_encryption_keys_active", "id", postgresql_where=Column("is_active")),
    )
  • The partial index on active keys ensures lightning-fast lookups for current encryption keys.

Knowledge Base Organization

class KnowledgeBaseDocument(Base, TimestampMixin):
    knowledge_base_id: Mapped[int] = mapped_column(ForeignKey("knowledge_bases.id"))
    document_id: Mapped[int] = mapped_column(ForeignKey("documents_registry.id"))
    status: Mapped[OperationStatusEnum] = mapped_column(
        SQLEnum(OperationStatusEnum),
        server_default=OperationStatusEnum.PENDING.value
    )
  • This design allows document to belong to multiple knowledge bases while tracking the processing status for each relationship independently — crucial for systems that need to rebuild indexes or reprocess documents

Configuration Management: Production Ready from the Start

  • Professional applications require robust configuration management and NeuroStash uses pydantic settings for type-safe, environment-aware configurations
class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=os.path.join(os.path.dirname(__file__), "..", "..", ".env"),
        env_file_encoding="utf-8",
        extra="ignore",
    )

    POSTGRES_SERVER: str
    POSTGRES_PORT: int
    POSTGRES_USER: str
    POSTGRES_PASSWORD: str
    POSTGRES_DB: str

    @computed_field
    @property
    def SQLALCHEMY_DATABASE_URI(self) -> PostgresDsn:
        return MultiHostUrl.build(
            scheme="postgresql+psycopg",
            username=self.POSTGRES_USER,
            password=self.POSTGRES_PASSWORD,
            host=self.POSTGRES_SERVER,
            port=self.POSTGRES_PORT,
            path=self.POSTGRES_DB,
        )

Database Migrations

  • Database migrations are often an afterthought, but they’re critical for production systems.

  • NeuroStash uses Alembic with several key Optimizations

Explicit Enum Control Strategy

  • I learned this lesson the hard way: Alembic’s auto-generated enum handling has critical flaw during migration downgrades. When you downgrade a migration, the enums aren’t properly deleted, causing “enum already exists” errors when you try to run the migration again.
client_role_enum = postgresql.ENUM(
    "USER", "ADMIN", name="client_roles", create_type=False
)
operation_status_enum = postgresql.ENUM(
    "PENDING", "SUCCESS", "FAILED", name="operation_status", create_type=False
)

def upgrade() -> None:
    # Explicitly create enums with checkfirst=True
    client_role_enum.create(op.get_bind(), checkfirst=True)
    operation_status_enum.create(op.get_bind(), checkfirst=True)

    # Then create tables that reference them
    op.create_table("user_clients", ...)

def downgrade() -> None:
    # Drop tables first
    op.drop_table("user_clients")

    # Then explicitly drop enums
    operation_status_enum.drop(op.get_bind(), checkfirst=True)
    client_role_enum.drop(op.get_bind(), checkfirst=True)
  • When testing migrations locally, you can reliably downgrade and re-run without manual database cleanup

  • Production rollbacks work cleanly without manual intervention

  • checkfirst=True is very crucial — it prevent errors if the enum already exists during the upgrade or doesn’t exists during downgrade.

Migration Environment Setup

  • The env.py configuration demonstrates production-ready practices
db_url = str(settings.SQLALCHEMY_DATABASE_URI)
config.set_main_option("sqlalchemy.url", db_url)
  • Instead of passing the database connection url we managed through configuration and env.py

Database Connection Management

  • The database connection setup in db.py includes several production-critical configuration
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings

engine = create_engine(
    url=str(settings.SQLALCHEMY_DATABASE_URI),
    pool_pre_ping=True,
    connect_args={"options": "-c timezone=utc"},
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  • pool_pre_ping=True: This is crucial for long-running applications. SQLAlchemy will test connections before use, automatically handling dropped connections that can occur in cloud environments or during database maintenance windows. Without this, you'd get cryptic connection errors in production.

  • timezone=utc: Forces all database connections to use UTC, preventing timezone-related bugs when the application runs in different regions or when daylight saving time changes occur. This is especially important for multi-tenant applications where users might be in different timezones.

  • Session Configuration: autocommit=False and autoflush=False provide explicit control over transaction boundaries, which is essential for maintaining data consistency in complex operations involving multiple tables.

    • autocommit determines each SQL statement is automatically committed to the database immediately after execution, rather than waiting for explicitly commit()call.

    • autoflush controls whether pending changes in session are automatically sent to the database befor executing queries

Database Initialization Strategy

  • The pre_start.py script implements a robust database readiness check
@retry(
    stop=stop_after_attempt(max_tries),
    wait=wait_fixed(wait_seconds),
    before=before_log(logger, logging.DEBUG),
    after=after_log(logger, logging.INFO),
)
def init(db_engine: Engine) -> None:
    try:
        with SqlSession(db_engine) as session:
            session.execute(text("SELECT 1"))
        logger.info("database connection successfully")
    except Exception as e:
        logger.error(f"database connection/query failed: {e}")
        raise

Current Status

  • Document Processing pipeline

  • Vector search integration

  • API Authentication

  • So much more…

Takeaways

  • Building NeuroStash is journey which is ongoing and has taught me plenty of things and yet more to come.

  • I learnt that you always start with strong foundation with proper configuration and database design and thinking long term, envisioning the product will pay dividends later

  • Building encryption, authentication, multi-tenancy solution from beginning is easier than retrofitting

  • Explicit control is better for production-systems

Check out the code: https://github.com/DEVunderdog/NeuroStash

Connect with me: https://x.com/fkhoja098

0
Subscribe to my newsletter

Read articles from Farhan Khoja directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Farhan Khoja
Farhan Khoja