Backend Routes - "push"
Backend strategies
Replicache defines "push" and "pull" endpoints that your server must implement for sync to work. There are various ways to write push and pull endpoints. As the heading suggests, we will write the push endpoint in this blog.
But before that, let's understand the different backend strategies Replicache supports so we can proceed with writing the code.
The main difference between the strategies is how they calculate the patch/diff
needed by the pull endpoint. Different approaches require different states to be stored in the backend database and different logic in the push and pull endpoints.
Also some use-cases are only supported well with some strategies. Notably:
Read Auth: When not all data is accessible to all users. In an application like Linear tasks of one organisation should not be synced with a users from different organisations.
Partial Sync: When a user only syncs some of the data they have access to. In an application like GitHub, each user has access to many GB of data, but only a small subset should be synced to the client at any given time.
Now let's discuss each of the different strategies
Reset Strategy
Basic description: it's very basic because it sends the entire client view on every pull response, so no diff calculation during pull. This is quite in-efficient and not recommended for apps where data is frequently changing
When to use: For apps with very small amounts of data, or where the data changes infrequently
Implementation: easy ๐ข
Performance: bad ๐ด (each pull computes and returns the entire client view)
Read Auth: easy ๐ข (because each client get's their own data that they have access to)
Partial Sync: easy ๐ข
Global Version Strategy
Basic description:
A single version is stored in the database and incremented on each push. Entities have a lastModifiedVersion field which is the global version the entity has last modified
The global version is returned as a cookie with every pull and then as a request on the next pull, which is used to calculate the diff (all the entities that have less more lastModifiedVersion that the last pull version)
When to use: Simple apps with low concurrency, and where data is synced to all users. (All users have access to same set of data, ex: maybe a local auction house or something)
Performance: bad ๐ด Limited to about 50 pushes/second across entire app. Which means only 50 operations your server can handle every second. (This happens because every push is trying to get access to the same set of resources to modify and acquiring locks)
Implementation: easy ๐ข
Read Auth: Hard ๐ด (because as discussed global version and every client accesses it and the same set of data)
Partial Sync: Hard ๐ด
Per-Space Version strategy:
Basic description:
Same as global version strategy except it has a concept of space. As in there is a version to maintain per space rather than the whole server
Think of space as an organisation in linear
When to use: Apps where data can be synced per space like example can be a linear organisation.
Performance: ๐ก Limited to 50 pushes/second/space
Implementation: easy ๐ข
Read Auth: ๐ก Restricting access cross space is pre built as in one member from organisation 1 can not view data from organisation 2. But if you want to implement read auth inside organisation 1 (like scoping tasks to teams) that can't be implemented, basically all users within a space see everything in that space
Partial Sync: ๐ก You can choose which spaces to sync to each client, but within a space all data is synced.
Row Version Strategy (๐)
Description:
The client view can be computed dynamically for a specific user. it can be any arbitrary query over the database
Does not require global locks or concept of spaces
Performance: ๐ข close to traditional non transactional servers
Implementation: ๐ด Most difficult
Read Auth: ๐ข Fully supported. Each individual data item is authorised before sending to the client
Partial Sync: ๐ข Full supported, sync any arbitrary subset of data based on logic of your liking
After this basic yet tedious process of reading through this what strategy do you think you need in your app?
We will continue along with Row Version Strategy the most complex yet most scalable approach throughout the rest of the blog
Let's see our database schema and what state are we going to store inside our database
open up schema.prisma file inside models package and prisma folder of our monorepo
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model User {
id String @id @default(uuid())
email String @unique
clientGroups ClientGroup[]
todos Todo[]
@@map("users")
}
model ClientGroup {
id String @id
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
userId String @map("user_id")
cvrVersion Int @map("cvr_version")
lastModified DateTime @default(now()) @updatedAt @map("last_modified")
clients Client[]
@@index([id, userId])
@@map("client_groups")
}
model Client {
id String @id
clientGroup ClientGroup @relation(fields: [clientGroupId], references: [id], onDelete: Cascade)
clientGroupId String @map("client_group_id")
lastMutationId Int @default(0) @map("last_mutation_id")
lastModified DateTime @default(now()) @updatedAt @map("last_modified")
@@index([id, clientGroupId])
@@map("clients")
}
model Todo {
id String @id
title String
completed Boolean @default(false)
userId String @map("user_id")
user User @relation(fields: [userId], references: [id])
rowVersion Int @default(0) @map("row_version")
@@map("todos")
}
If understanding what clientGroup and client is an issue we have already discussed them in detail in Blog-3 Key terminologies section
Let's go through the schema a bit
ClientGroup:
Refers to a browser profile and is scoped to userId
contains something called CVR version which we will discuss further when we learn about CVR
lastModified is basically updatedAt column
Clients:
A client refers to a browser tab under a browser profile
scoped to a single client group
When a mutation happens it's the client who sends a request to our server's push endpoint to sync it with the database. Hence we store the lastMutationID it triggered inside our database and is used inside push and pull
lastModified again is basically updatedAt column
Todo:
Our business requirement table/model
Main thing to notice inside todo model is a column called rowVersion. For now just keep in mind every entity/table you want to sync from the database to replicache just add this rowVersion column in it, we will discuss in detail when we actually learn about CVR and the pull endpoint
Make sure to increment the rowVersion by 1 every time some update happens on a row that has a rowVersion columns
Make sure every dataset/table that you are thinking to sync with replicache their should be a way on your server to check if a particular user has access to that entity
For now just keep these pointers in your head and we will revisit them while discussing CVR and the pull endpoint which will help us gain a crystal clear clarity on these concepts
Now last piece of setup I promise before actually writing the push endpoint is a bit of disclaimer about transactions
All endpoints inside replicache server setup have to be transactional with a high level of isolation (our setup is with serialisable), with high level of isolation comes deadlock errors.
On every "push" or "pull" that happens on the server replicache client keeps on retrying and sending requests to our server. But we should not rely on that alone, rather retry ourselves on our servers, every time any of our request handlers throws an error.
to see that logic hop over to prisma-client.ts file inside the models package
import { Prisma, PrismaClient } from "@prisma/client";
import { logger } from "@repo/lib";
import { AppError } from "./err";
declare global {
// eslint-disable-next-line no-var
var prisma: PrismaClient | undefined;
}
export const prismaClient =
global.prisma ||
new PrismaClient({
log: ["query"],
});
if (process.env.NODE_ENV !== "production") global.prisma = prismaClient;
export type TransactionalPrismaClient = Parameters<Parameters<PrismaClient["$transaction"]>[0]>[0];
export default prismaClient;
export * from "@prisma/client";
export async function transact<T>(body: (tx: TransactionalPrismaClient) => Promise<T>): Promise<T> {
for (let i = 0; i < 10; i++) {
try {
const r = await prismaClient.$transaction(body, {
isolationLevel: "Serializable",
});
return r;
} catch (error) {
logger.error(`Transaction failed, retrying...${error}`);
if (shouldRetryTxn(error)) {
logger.debug(`Retrying transaction...`);
continue;
}
}
}
throw new AppError({
code: "INTERNAL_SERVER_ERROR",
message: "Tried 10 times, but transaction still failed",
});
}
function shouldRetryTxn(error: unknown) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
return error.code === "P2002";
}
return false;
}
so as per this function whenever we want to instantiate a transaction we use this transact function over using normal prisma transaction, because this reties automatically in case of dead lock errors on the database. [read more about this topic https://www.postgresql.org/docs/16/transaction-iso.html]
Now let's hop back to the replicache.router.ts files inside the api app
import { Router } from "express";
import { pullRequestSchema, pushRequestSchema } from "@repo/models";
import { replicacheController } from "../controllers/replicache.controller";
import { authenticate } from "../middlewares/auth.middleware";
import validate from "../middlewares/validate.middleware";
export const replicacheRouter = Router({ mergeParams: true });
/**
* @method POST @url /replicache/push @desc push mutations to the server
*/
replicacheRouter.post(
"/push",
validate(pushRequestSchema),
authenticate,
replicacheController.push,
);
/**
* @method POST @url /replicache/pull @desc pull diff from the server
*/
replicacheRouter.post(
"/pull",
validate(pullRequestSchema),
authenticate,
replicacheController.pull,
);
Now let's go over this file a bit to explain you guys that work I have done
Here we are creating a replicache router which is later registered to our server
With the router I am mounting two routes "push" and "pull"
On both routes I am attaching 2 middlewares
validate --> if you checkout validate it just takes in a schema and returns a higher order function which validates the request body, headers, params, query etc against the defined schema and throws error if there is an error
authenticate --> makes sure the user is authenticated and is also responsible for inserting the userId inside the request object for handy retrieval of the userID inside every route that requires the user to be authenticated
Then let's go to the controllers in this blog we are just going to discuss the push controller.
Hop on the replicache.controller.ts file
import { type NextFunction, type Request, type RequestHandler, type Response } from "express";
import {
AppError,
type PushRequestType,
transact,
} from "@repo/models";
import { logger } from "@repo/lib";
import { ReplicacheService } from "../services/replicache.service";
import { TodoService } from "../services/todo.service";
import { sendPoke } from "../utils/poke";
class ReplicacheController {
push: RequestHandler = async (
req: Request<object, object, PushRequestType["body"]>,
res: Response,
next: NextFunction,
) => {
const userId = req.user.id;
try {
const push = req.body;
for (const mutation of push.mutations) {
try {
await ReplicacheService.processMutation({
clientGroupID: push.clientGroupID,
errorMode: false,
mutation,
userId,
});
} catch (error) {
await ReplicacheService.processMutation({
clientGroupID: push.clientGroupID,
errorMode: true,
mutation,
userId,
});
}
}
return res.status(200).json({
success: true,
});
} catch (error) {
if (error instanceof AppError) {
return next(error);
}
logger.error(error);
return next(
new AppError({
code: "INTERNAL_SERVER_ERROR",
message:
"Failed to push data to the server, due to an internal error. Please try again later.",
}),
);
} finally {
await sendPoke({ userId });
}
};
}
export const replicacheController = new ReplicacheController();
In this file you can see the push controller does very little amount of things and most of the business logic is even further abstracted to the service layer
Define the push method inside the controller class
Take out the properties that replicache sends us inside the push body (find that inside the replicache.schema.ts file inside the models package)
import { z } from "zod"; const mutation = z.object({ id: z.number(), clientID: z.string(), name: z.string(), args: z.any(), }); export type MutationType = z.infer<typeof mutation>; export const pushRequestSchema = z.object({ body: z.object({ profileID: z.string(), clientGroupID: z.string(), mutations: z.array(mutation), schemaVersion: z.string(), }), }); export type PushRequestType = z.infer<typeof pushRequestSchema>; const cookieSchema = z .object({ order: z.number(), clientGroupID: z.string(), }) .optional() .nullable(); export type PullCookie = z.infer<typeof cookieSchema>; export const pullRequestSchema = z.object({ body: z.object({ profileID: z.string(), clientGroupID: z.string(), cookie: cookieSchema, schemaVersion: z.string(), }), }); export type PullRequestType = z.infer<typeof pullRequestSchema>;
As we know replicache sends us mutations in batch in form of an array as you can see in this schema file (generally it is one mutation per push, but when user does mutations at a high frequency or has internet issues replicache queues up the mutations and sends it in batches) any way we need to handle all those mutations
So inside our try block we loop through all mutations and start a try catch block even inside each iteration
and we just call ReplicacheService.processMutation on both try and catch with the only difference being in the try block we have
errorMode as true
and for catch we haveerrorMode as false
To give a brief understanding of how these errorModes work inside the processMutation, we call the transact function (which starts a transaction and retriers it 10 times on deadlock errors) with errorMode true, so in any other case of server error or database failure after 10 reties we run the same function inside catch block with errorModel as true and by doing some config inside the function we block replicache to send this specific mutation again and again because we tried on server and it did not work (more on this shortly when we will write up the actual processMutation function, but even more we will discuss in blog-8 when we discuss error handling)
After that we simply send a success true message to the client if all worked out, else do some error handling
And in the finally block we send a poke to the concerned client (as discussed in the intro of replicache(Blog-3) send poke just sends a empty pub sub events via web-sockets to the concerned clients that something has changed on the server and they need to pull.
Just remember every time you update something that is synced with replicache update the rowVersion and sendPoke to connected clients so they can pull and see the changes immediately
For now knowing this much is enough we will be setting up the poke functionality on both frontend and backend on in blog-7
Let's dive deep into the processMutation function finally ๐คฏ
head over to replicache.service.ts file and checkout processMutation function
import {
AppError,
type MutationType,
type TodoType,
transact,
} from "@repo/models";
import { logger } from "@repo/lib";
import { ClientService } from "./client.service";
import { ClientGroupService } from "./client-group.service";
import { serverMutators } from "../mutators";
export class ReplicacheService {
static async processMutation({
clientGroupID,
errorMode,
mutation,
userId,
}: {
userId: string;
clientGroupID: string;
mutation: MutationType;
errorMode: boolean;
}): Promise<void> {
await transact(async (tx) => {
logger.info(
`Processing mutation ${mutation.name} for user ${userId} in client group ${clientGroupID}`,
);
// 1. Instantiate client and cliet group services inside the transaction block
const clientGroupService = new ClientGroupService(tx);
const clientService = new ClientService(tx);
// 2. Fetch the base client group and client
const [baseClientGroup, baseClient] = await Promise.all([
clientGroupService.getClientGroupById({
id: clientGroupID,
userId,
}),
clientService.getClientById({
id: mutation.clientID,
clientGroupId: clientGroupID,
}),
]);
// 3. calculate the next mutation id
const nextMutationId = baseClient.lastMutationId + 1;
// 4. Check if the mutation id is valid
//#region //*=========== Mutation id checks ===========
// 4.1. Check if the mutation id is less --> means already processed so just return
if (mutation.id < nextMutationId) {
logger.debug(`Skipping mutation ${mutation.id} because it has already been applied`);
return;
}
// 4.2. Check if the mutation id is greater --> means future mutation so throw error
if (mutation.id > nextMutationId) {
logger.error(
`Mutation ${mutation.id} is too far in the future, expected ${nextMutationId}`,
);
throw new AppError({
code: "INTERNAL_SERVER_ERROR",
message: `Mutation ${mutation.id} is too far in the future, expected ${nextMutationId}`,
});
}
//#endregion //*======== Mutation id checks ===========
// 5. Apply the mutation if not error mode
if (!errorMode) {
try {
// 5.1. Check if the mutation is valid
const mutationName = mutation.name as keyof typeof serverMutators;
const mutator = serverMutators[mutationName];
if (!mutator) {
logger.error(`Unknown mutation ${mutation.name}`);
throw new Error(`Unknown mutation ${mutation.name}`);
}
// 5.2. Apply the mutation
const args = mutation.args;
await mutator({
args,
userId,
tx,
});
} catch (error) {
if (error instanceof AppError) {
throw error;
}
logger.error(error);
throw new AppError({
code: "INTERNAL_SERVER_ERROR",
message: `Failed to apply mutation: ${mutation.name} to the server, due to an internal error. Please try again later.`,
});
}
}
// 6. Update the client with the new mutation id
await Promise.all([
clientGroupService.upsert({ ...baseClientGroup }),
clientService.upsert({
id: baseClient.id,
clientGroupId: baseClient.clientGroupId,
lastMutationId: nextMutationId,
}),
]);
logger.info(`Mutation ${mutation.id} applied successfully`);
});
}
}
In the code block I have already marked sections of code with steps and added a brief description of what each step does now let's do a deep dive to each step
Instantiating client and client group services inside the transaction block. This is done so we use the
tx
object of the current transaction to perform db operations inside our client and client group services so all the db operations are the part of the same transaction, which makes sure if any step fails at any point of time the whole transaction will rollbackFetch
baseClientGroup
andbaseClient
these are simple methods which won't need much explanation so am pasting the code files for both client.service.ts and client-group.service.ts belowclient.service.ts
import { AppError, type Prisma, prismaClient, type SearchResult, type TransactionalPrismaClient, } from "@repo/models"; /** * @description this class is to be used for transactional operations on the client table */ export class ClientService { constructor(private tx: TransactionalPrismaClient = prismaClient) {} async getById({ id, clientGroupId }: { id: string; clientGroupId: string }): Promise< Prisma.ClientGetPayload<{ select: { id: true; clientGroupId: true; lastMutationId: true; }; }> > { const client = await this.tx.client.findUnique({ where: { id, clientGroupId, }, select: { id: true, clientGroupId: true, lastMutationId: true, }, }); if (!client) { return { id, clientGroupId, lastMutationId: 0, }; } if (client.clientGroupId !== clientGroupId) { throw new AppError({ code: "UNAUTHORIZED", message: "You are not authorized to access this client", }); } return client; } } /** * @description this constant is an instance of the ClientService class, and should be used for non transactional operations * on the client table */ export const clientService = new ClientService();
client-group.service.ts
import { AppError, type Prisma, prismaClient } from "@repo/models"; import { type TransactionalPrismaClient } from "@repo/models"; /** * @description this class is to be used for transactional operations on the clientGroup table */ export class ClientGroupService { constructor(private tx: TransactionalPrismaClient = prismaClient) {} async getById({ id, userId }: { id: string; userId: string }): Promise< Prisma.ClientGroupGetPayload<{ select: { id: true; userId: true; cvrVersion: true; }; }> > { const clientGroup = await this.tx.clientGroup.findUnique({ where: { id, userId, }, select: { id: true, userId: true, cvrVersion: true, }, }); if (!clientGroup) { return { id, userId, cvrVersion: 0, }; } if (clientGroup.userId !== userId) { throw new AppError({ code: "UNAUTHORIZED", message: "You are not authorized to access this client group", }); } return clientGroup; } } /** * @description this constant is an instance of the ClientGroupService class, and should be used for non transactional operations */ export const clientGroupService = new ClientGroupService();
Interesting part about both the
getById
methods is that they try to find clientGroup and client and if they don't find rather than creating they just return an object of how a newly created client/clientGroup would look like there is a very good reason behind that.Try on your own to play around with the order here and you will realise the issue lemme give a hint (deadlock)
This is simple we just take the baseClient and get it's lastMutationID it will be 0 if not present earlier or will be some number if it existed in our db so we will simply increment it by 1 and expect the current mutation that we are executing should be exactly same as it as we are maintaining that state inside the client table inside our database
Step 4 is a checker where we see
if current mutation id is less than the
nextMutationID
calculated in step 3 we just return in production and in dev we log (check how logger works inside the lib package!!) this rarely happens in production when user has a two tabs opened up with same account (same client group) or in dev modeif current mutation id is somehow more than the
nextMutationID
that's a blunder which means a mutation is somehow coming from the future that we don't have track off, never happens on production if your setup is right (which in this case is ๐)
Step 5 is simply checking if not in errorMode
5.1 check if the mutation is valid by checking agains the set of server mutators we have as a object (yet to be written by us) with the help of the type-system we wrote in the last blog (you will see how easy it will be writing mutators from the type-system)
5.2 Apply the mutation by simply calling it with args, userId and the current
tx
object so whatever db calls happen inside the transaction
Step 6 upsert the client and client group
upsert the clientGroup, in this case we are not really updating anything as you can see inside step6 of processMutation because we just passing the baseClientGroup, the purpose of calling this function here is to just create the clientGroup if not exists because remember in the getById function we returned an object with values rather than creating the clientGroup if it did not exist
import { AppError, type Prisma, prismaClient } from "@repo/models"; import { type TransactionalPrismaClient } from "@repo/models"; /** * @description this class is to be used for transactional operations on the clientGroup table */ export class ClientGroupService { constructor(private tx: TransactionalPrismaClient = prismaClient) {} async upsert({ id, userId, cvrVersion }: { id: string; userId: string; cvrVersion: number }) { return await this.tx.clientGroup.upsert({ where: { id, userId, }, update: { lastModified: new Date(), cvrVersion, }, create: { id, userId, cvrVersion, lastModified: new Date(), }, select: { id: true, cvrVersion: true, }, }); } } /** * @description this constant is an instance of the ClientGroupService class, and should be used for non transactional operations */ export const clientGroupService = new ClientGroupService();
Upsert the client. As you can see in the processMutation function, we are updating the lastMutationID to nextMutationID. If the client does not exist, we create it; otherwise, we just update it.
import { AppError, type Prisma, prismaClient, type SearchResult, type TransactionalPrismaClient, } from "@repo/models"; /** * @description this class is to be used for transactional operations on the client table */ export class ClientService { constructor(private tx: TransactionalPrismaClient = prismaClient) {} async upsert({ id, clientGroupId, lastMutationId, }: { id: string; clientGroupId: string; lastMutationId: number; }) { await this.tx.client.upsert({ where: { id, clientGroupId, }, create: { id, clientGroupId, lastMutationId, }, update: { lastMutationId, }, select: { id: true, }, }); } } /** * @description this constant is an instance of the ClientService class, and should be used for non transactional operations * on the client table */ export const clientService = new ClientService();
Last piece of the puzzle for push let's write down the serverMutators itself that will actually execute the business logic and actually sync the user mutations on the backend database
Go to mutators/index.ts inside api
import { type M, type MutatorType } from "@repo/models"; import { todoMutators } from "./todo.mutator"; export const serverMutators: M<MutatorType.SERVER> = { ...todoMutators, };
here we are creating the actual serverMutators object from the type-system we wrote in the last blog, but what about the todoMutators thoug?
Now go to todo.mutator.ts inside the same folder
import { AppError, type MutatorType, todoCreateSchema, todoDeleteSchema, type TodoMutators, todoUpdateSchema, } from "@repo/models"; import { TodoService } from "../services/todo.service"; export const todoMutators: TodoMutators<MutatorType.SERVER> = { async todoCreate(body) { const { args, tx, userId } = body; const parsed = todoCreateSchema.safeParse(args); if (!parsed.success) { throw new AppError({ code: "BAD_REQUEST", message: `Invalid request body, ${parsed.error.message}`, }); } const todoService = new TodoService(tx); await todoService.create({ args: parsed.data, userId }); }, async todoUpdate(body) { const { args, tx, userId } = body; const parsed = todoUpdateSchema.safeParse(args); if (!parsed.success) { throw new AppError({ code: "BAD_REQUEST", message: `Invalid request body, ${parsed.error.message}`, }); } const todoService = new TodoService(tx); await todoService.update({ args, userId }); }, async todoDelete(body) { const { args, tx, userId } = body; const parsed = todoDeleteSchema.safeParse(args); if (!parsed.success) { throw new AppError({ code: "BAD_REQUEST", message: `Invalid request body, ${parsed.error.message}`, }); } const todoService = new TodoService(tx); await todoService.delete({ args, userId }); }, };
here as you can see we are defining all the mutators that are actually registered to replicache on the frontend, all of them have the same kind of template
get args,
tx
and userId from the body of the mutationparse args with your schema written
call the service with
tx
to make it part of the same transactioncall the service function's relevant business method like create, update, delete etc
You can actually hop onto
todo.service.ts
and checkout the methods, they are quite straight forward so am not covering them here
So finally we have setup our push endpoint and the backend mutators, lets proceed with setting up the pull endpoint in the next blog.
Subscribe to my newsletter
Read articles from Ronit Panda directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Ronit Panda
Ronit Panda
Founding full stack engineer at dimension.dev