Building Neurostash - I

The Problem That Started It All
We've all been there—drowning in documents scattered across emails, cloud drives, and local folders. Important information buried in PDFs, critical insights lost in meeting notes, and that one crucial document that you know exists but can't find when you need it most.
After experiencing this pain point repeatedly in my development work, I decided to build NeuroStash: an open-source knowledge management system designed to ingest, process, and intelligently extract information from document which can be self-hosted. But this isn't just another file storage solution—it's a production-ready system built with scalability, security, and developer experience in mind.
Project structure
NeuroStash
|- app/ # HTTP Endpoints and request/respones handler
|- core/ # Configuration and Database connection
|- dao/ # Data Access Objects - Database Operations
|- consumer/ # Asynchronous message processing
|- processor/ # Document processing pipeline
|- migrations # Database schema migrations
Database Schema Design: Multi-tenancy and Security First
- The core of NeuroStash lies in its carefully designed database schema that balances performance, security, and flexibility
Multi-tenant User Management
class UserClient(Base, TimestampMixin):
__tablename__ = "user_clients"
id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
email: Mapped[str] = mapped_column(String(255), nullable=False, unique=True)
role: Mapped[ClientRoleEnum] = mapped_column(
SQLEnum(ClientRoleEnum, name="client_roles", create_type=False),
nullable=False
)
api_keys: Mapped[List["ApiKey"]] = relationship(
back_populates="user_client", cascade="all, delete-orphan"
)
documents: Mapped[List["DocumentRegistry"]] = relationship(
back_populates="user_client"
)
Document Lifecycle Management
class DocumentRegistry(Base, TimestampMixin):
__tablename__ = "documents_registry"
user_id: Mapped[int] = mapped_column(ForeignKey("user_clients.id"))
file_name: Mapped[str] = mapped_column(String(100), nullable=False)
object_key: Mapped[str] = mapped_column(String(150), nullable=False)
lock_status: Mapped[bool] = mapped_column(Boolean, nullable=False)
op_status: Mapped[OperationStatusEnum] = mapped_column(
SQLEnum(OperationStatusEnum),
nullable=False,
server_default=OperationStatusEnum.PENDING.value
)
__table_args__ = (
UniqueConstraint("user_id", "file_name", name="idx_unique_filename"),
Index("idx_file_registry_user_id", "user_id", "lock_status", "op_status"),
)
The
lock_status
field prevents concurrent processing of the same document, whileop_status
provides clear visibility into processing state.The compound index on
(user_id, lock_status, op_status)
enables efficient queries for finding document ready for processing
Encryption Key Management
class EncryptionKey(Base, TimestampMixin):
__tablename__ = "encryption_keys"
symmetric_key: Mapped[bytes] = mapped_column(LargeBinary, nullable=False)
is_active: Mapped[bool] = mapped_column(Boolean, server_default=text("false"))
expired_at: Mapped[Optional[datetime]] = mapped_column(TIMESTAMP(timezone=True))
__table_args__ = (
Index("idx_encryption_keys_active", "id", postgresql_where=Column("is_active")),
)
- The partial index on active keys ensures lightning-fast lookups for current encryption keys.
Knowledge Base Organization
class KnowledgeBaseDocument(Base, TimestampMixin):
knowledge_base_id: Mapped[int] = mapped_column(ForeignKey("knowledge_bases.id"))
document_id: Mapped[int] = mapped_column(ForeignKey("documents_registry.id"))
status: Mapped[OperationStatusEnum] = mapped_column(
SQLEnum(OperationStatusEnum),
server_default=OperationStatusEnum.PENDING.value
)
- This design allows document to belong to multiple knowledge bases while tracking the processing status for each relationship independently — crucial for systems that need to rebuild indexes or reprocess documents
Configuration Management: Production Ready from the Start
- Professional applications require robust configuration management and NeuroStash uses pydantic settings for type-safe, environment-aware configurations
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=os.path.join(os.path.dirname(__file__), "..", "..", ".env"),
env_file_encoding="utf-8",
extra="ignore",
)
POSTGRES_SERVER: str
POSTGRES_PORT: int
POSTGRES_USER: str
POSTGRES_PASSWORD: str
POSTGRES_DB: str
@computed_field
@property
def SQLALCHEMY_DATABASE_URI(self) -> PostgresDsn:
return MultiHostUrl.build(
scheme="postgresql+psycopg",
username=self.POSTGRES_USER,
password=self.POSTGRES_PASSWORD,
host=self.POSTGRES_SERVER,
port=self.POSTGRES_PORT,
path=self.POSTGRES_DB,
)
Database Migrations
Database migrations are often an afterthought, but they’re critical for production systems.
NeuroStash uses Alembic with several key Optimizations
Explicit Enum Control Strategy
- I learned this lesson the hard way: Alembic’s auto-generated enum handling has critical flaw during migration downgrades. When you downgrade a migration, the enums aren’t properly deleted, causing “enum already exists” errors when you try to run the migration again.
client_role_enum = postgresql.ENUM(
"USER", "ADMIN", name="client_roles", create_type=False
)
operation_status_enum = postgresql.ENUM(
"PENDING", "SUCCESS", "FAILED", name="operation_status", create_type=False
)
def upgrade() -> None:
# Explicitly create enums with checkfirst=True
client_role_enum.create(op.get_bind(), checkfirst=True)
operation_status_enum.create(op.get_bind(), checkfirst=True)
# Then create tables that reference them
op.create_table("user_clients", ...)
def downgrade() -> None:
# Drop tables first
op.drop_table("user_clients")
# Then explicitly drop enums
operation_status_enum.drop(op.get_bind(), checkfirst=True)
client_role_enum.drop(op.get_bind(), checkfirst=True)
When testing migrations locally, you can reliably downgrade and re-run without manual database cleanup
Production rollbacks work cleanly without manual intervention
checkfirst=True
is very crucial — it prevent errors if the enum already exists during the upgrade or doesn’t exists during downgrade.
Migration Environment Setup
- The
env.py
configuration demonstrates production-ready practices
db_url = str(settings.SQLALCHEMY_DATABASE_URI)
config.set_main_option("sqlalchemy.url", db_url)
- Instead of passing the database connection url we managed through configuration and env.py
Database Connection Management
- The database connection setup in
db.py
includes several production-critical configuration
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(
url=str(settings.SQLALCHEMY_DATABASE_URI),
pool_pre_ping=True,
connect_args={"options": "-c timezone=utc"},
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
pool_pre_ping=True: This is crucial for long-running applications. SQLAlchemy will test connections before use, automatically handling dropped connections that can occur in cloud environments or during database maintenance windows. Without this, you'd get cryptic connection errors in production.
timezone=utc
: Forces all database connections to use UTC, preventing timezone-related bugs when the application runs in different regions or when daylight saving time changes occur. This is especially important for multi-tenant applications where users might be in different timezones.Session Configuration:
autocommit=False
andautoflush=False
provide explicit control over transaction boundaries, which is essential for maintaining data consistency in complex operations involving multiple tables.autocommit
determines each SQL statement is automatically committed to the database immediately after execution, rather than waiting for explicitlycommit()
call.autoflush
controls whether pending changes in session are automatically sent to the database befor executing queries
Database Initialization Strategy
- The pre_start.py script implements a robust database readiness check
@retry(
stop=stop_after_attempt(max_tries),
wait=wait_fixed(wait_seconds),
before=before_log(logger, logging.DEBUG),
after=after_log(logger, logging.INFO),
)
def init(db_engine: Engine) -> None:
try:
with SqlSession(db_engine) as session:
session.execute(text("SELECT 1"))
logger.info("database connection successfully")
except Exception as e:
logger.error(f"database connection/query failed: {e}")
raise
Current Status
Document Processing pipeline
Vector search integration
API Authentication
So much more…
Takeaways
Building NeuroStash is journey which is ongoing and has taught me plenty of things and yet more to come.
I learnt that you always start with strong foundation with proper configuration and database design and thinking long term, envisioning the product will pay dividends later
Building encryption, authentication, multi-tenancy solution from beginning is easier than retrofitting
Explicit control is better for production-systems
Check out the code: https://github.com/DEVunderdog/NeuroStash
Connect with me: https://x.com/fkhoja098
Subscribe to my newsletter
Read articles from Farhan Khoja directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
