Kira is building a collaborative workspace app (think Notion-lite) for small teams. Users need to see each other's edits in real-time, know who's viewing what, and never lose data even during server restarts. The system needs to handle 500 concurrent users editing 100+ documents simultaneously, with reliable notifications when someone is mentioned or assigned a task.
Step 1: Document Storage with Redis OM
// models/document.ts — Fast document storage with full-text search
import { Client, Schema, Repository } from "redis-om";
const client = await new Client().open(process.env.REDIS_URL);
const documentSchema = new Schema("document", {
title: { type: "text" }, // Full-text searchable
content: { type: "string" }, // JSON string of document tree
workspaceId: { type: "string" },
createdBy: { type: "string" },
lastEditedBy: { type: "string" },
updatedAt: { type: "date", sortable: true },
tags: { type: "string[]" },
});
const docRepo = new Repository(documentSchema, client);
await docRepo.createIndex();
// Save document with sub-millisecond latency
async function saveDocument(docId: string, content: any, editedBy: string) {
await docRepo.save(docId, {
...await docRepo.fetch(docId),
content: JSON.stringify(content),
lastEditedBy: editedBy,
updatedAt: new Date(),
});
}
// Search across all documents
async function searchDocuments(workspaceId: string, query: string) {
return docRepo.search()
.where("workspaceId").eq(workspaceId)
.and("title").matches(query)
.sortBy("updatedAt", "DESC")
.page(0, 20)
.return.all();
}
Step 2: Real-Time Sync via Centrifugo
// services/realtime.ts — WebSocket connections for live editing
import jwt from "jsonwebtoken";
// Generate Centrifugo token for authenticated user
function generateRealtimeToken(userId: string, workspaceId: string) {
return jwt.sign(
{
sub: userId,
channels: [`doc:${workspaceId}:*`], // Can subscribe to any doc in workspace
},
process.env.CENTRIFUGO_SECRET!,
{ expiresIn: "24h" },
);
}
// Publish edit operation to all viewers
async function broadcastEdit(docId: string, operation: EditOperation) {
await fetch(`${process.env.CENTRIFUGO_URL}/api/publish`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `apikey ${process.env.CENTRIFUGO_API_KEY}`,
},
body: JSON.stringify({
channel: `doc:${operation.workspaceId}:${docId}`,
data: {
type: "edit",
userId: operation.userId,
operations: operation.ops, // OT or CRDT operations
version: operation.version,
timestamp: Date.now(),
},
}),
});
}
// Broadcast cursor position
async function broadcastCursor(docId: string, workspaceId: string, userId: string, cursor: CursorPosition) {
await fetch(`${process.env.CENTRIFUGO_URL}/api/publish`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `apikey ${process.env.CENTRIFUGO_API_KEY}`,
},
body: JSON.stringify({
channel: `doc:${workspaceId}:${docId}`,
data: { type: "cursor", userId, position: cursor },
}),
});
}
Step 3: Reliable Event Processing with RabbitMQ
// services/events.ts — Reliable async processing via message queue
import amqp from "amqplib";
async function setupEventProcessing() {
const connection = await amqp.connect(process.env.RABBITMQ_URL!);
const channel = await connection.createChannel();
// Document events queue
await channel.assertExchange("doc-events", "topic", { durable: true });
await channel.assertQueue("doc-notifications", {
durable: true,
arguments: { "x-dead-letter-exchange": "dlx" },
});
await channel.bindQueue("doc-notifications", "doc-events", "doc.mention.*");
await channel.bindQueue("doc-notifications", "doc-events", "doc.assign.*");
// Process notifications reliably
await channel.prefetch(10);
channel.consume("doc-notifications", async (msg) => {
if (!msg) return;
try {
const event = JSON.parse(msg.content.toString());
if (msg.fields.routingKey.startsWith("doc.mention")) {
await sendMentionNotification(event); // Email + push
await broadcastNotification(event.userId, { // Real-time via Centrifugo
type: "mention", docId: event.docId, mentionedBy: event.mentionedBy,
});
}
channel.ack(msg);
} catch (error) {
channel.nack(msg, false, false); // To DLX on failure
}
});
}
// Publish event when user mentions someone
async function publishMention(docId: string, mentionedBy: string, mentionedUser: string) {
const channel = await getChannel();
channel.publish("doc-events", `doc.mention.${mentionedUser}`,
Buffer.from(JSON.stringify({ docId, mentionedBy, userId: mentionedUser, timestamp: Date.now() })),
{ persistent: true },
);
}
Step 4: Type-Safe Business Logic with Effect
// services/document-service.ts — Type-safe operations with error handling
import { Effect, pipe } from "effect";
class DocumentNotFound extends Error { readonly _tag = "DocumentNotFound"; }
class PermissionDenied extends Error { readonly _tag = "PermissionDenied"; }
class ConcurrentEditConflict extends Error { readonly _tag = "ConcurrentEditConflict"; }
function editDocument(docId: string, userId: string, ops: EditOp[], version: number) {
return Effect.gen(function* () {
// Check document exists
const doc = yield* Effect.tryPromise({
try: () => docRepo.fetch(docId),
catch: () => new DocumentNotFound(),
});
if (!doc) yield* Effect.fail(new DocumentNotFound());
// Check permissions
const hasAccess = yield* checkPermission(userId, docId, "edit");
if (!hasAccess) yield* Effect.fail(new PermissionDenied());
// Apply operations (with version check)
const applied = yield* applyOperations(doc, ops, version);
// Save + broadcast + queue notification (all must succeed)
yield* Effect.all([
Effect.tryPromise(() => saveDocument(docId, applied.content, userId)),
Effect.tryPromise(() => broadcastEdit(docId, { workspaceId: doc.workspaceId, userId, ops, version: applied.version })),
Effect.tryPromise(() => queueMentionEvents(docId, userId, ops)),
], { concurrency: 3 });
return applied;
});
}
// Handle errors at the API layer
const handleEdit = pipe(
editDocument(docId, userId, ops, version),
Effect.catchTags({
DocumentNotFound: () => Effect.succeed({ status: 404, error: "Document not found" }),
PermissionDenied: () => Effect.succeed({ status: 403, error: "No edit permission" }),
ConcurrentEditConflict: () => Effect.succeed({ status: 409, error: "Version conflict, please retry" }),
}),
);
Results
After 3 months in production with 200 active teams:
- Latency: Edit operations broadcast in <50ms via Centrifugo WebSocket; users feel instant sync
- Reliability: Zero lost edits; RabbitMQ ensures notification delivery even during deployments
- Search: Full-text document search in <5ms via Redis OM; users find any doc instantly
- Concurrent editing: 50+ simultaneous editors on a single document; cursor positions update in real-time
- Notification delivery: 99.8% successful; dead-letter queue catches failures for retry
- Type safety: Effect catches 12 potential error paths at compile time; zero uncaught exceptions in production
- Scale: 500 concurrent WebSocket connections on a single Centrifugo instance; horizontal scaling ready