Terminal.skills
Use Cases/Build a Real-Time Collaboration Backend with Message Queues and WebSockets

Build a Real-Time Collaboration Backend with Message Queues and WebSockets

Build a real-time collaborative document editor backend using Centrifugo for WebSocket connections, RabbitMQ (amqplib) for reliable event processing, Redis OM for fast document storage and search, and Effect for type-safe business logic — creating a system where multiple users edit simultaneously with presence awareness, conflict-free sync, and reliable event delivery.

Backend Development#websocket#realtime#pubsub#messaging#scalable
Works with:claude-codeopenai-codexgemini-clicursor
$

Kira is building a collaborative workspace app (think Notion-lite) for small teams. Users need to see each other's edits in real-time, know who's viewing what, and never lose data even during server restarts. The system needs to handle 500 concurrent users editing 100+ documents simultaneously, with reliable notifications when someone is mentioned or assigned a task.

Step 1: Document Storage with Redis OM

typescript
// models/document.ts — Fast document storage with full-text search
import { Client, Schema, Repository } from "redis-om";

const client = await new Client().open(process.env.REDIS_URL);

const documentSchema = new Schema("document", {
  title: { type: "text" },                // Full-text searchable
  content: { type: "string" },            // JSON string of document tree
  workspaceId: { type: "string" },
  createdBy: { type: "string" },
  lastEditedBy: { type: "string" },
  updatedAt: { type: "date", sortable: true },
  tags: { type: "string[]" },
});

const docRepo = new Repository(documentSchema, client);
await docRepo.createIndex();

// Save document with sub-millisecond latency
async function saveDocument(docId: string, content: any, editedBy: string) {
  await docRepo.save(docId, {
    ...await docRepo.fetch(docId),
    content: JSON.stringify(content),
    lastEditedBy: editedBy,
    updatedAt: new Date(),
  });
}

// Search across all documents
async function searchDocuments(workspaceId: string, query: string) {
  return docRepo.search()
    .where("workspaceId").eq(workspaceId)
    .and("title").matches(query)
    .sortBy("updatedAt", "DESC")
    .page(0, 20)
    .return.all();
}

Step 2: Real-Time Sync via Centrifugo

typescript
// services/realtime.ts — WebSocket connections for live editing
import jwt from "jsonwebtoken";

// Generate Centrifugo token for authenticated user
function generateRealtimeToken(userId: string, workspaceId: string) {
  return jwt.sign(
    {
      sub: userId,
      channels: [`doc:${workspaceId}:*`],  // Can subscribe to any doc in workspace
    },
    process.env.CENTRIFUGO_SECRET!,
    { expiresIn: "24h" },
  );
}

// Publish edit operation to all viewers
async function broadcastEdit(docId: string, operation: EditOperation) {
  await fetch(`${process.env.CENTRIFUGO_URL}/api/publish`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      Authorization: `apikey ${process.env.CENTRIFUGO_API_KEY}`,
    },
    body: JSON.stringify({
      channel: `doc:${operation.workspaceId}:${docId}`,
      data: {
        type: "edit",
        userId: operation.userId,
        operations: operation.ops,          // OT or CRDT operations
        version: operation.version,
        timestamp: Date.now(),
      },
    }),
  });
}

// Broadcast cursor position
async function broadcastCursor(docId: string, workspaceId: string, userId: string, cursor: CursorPosition) {
  await fetch(`${process.env.CENTRIFUGO_URL}/api/publish`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      Authorization: `apikey ${process.env.CENTRIFUGO_API_KEY}`,
    },
    body: JSON.stringify({
      channel: `doc:${workspaceId}:${docId}`,
      data: { type: "cursor", userId, position: cursor },
    }),
  });
}

Step 3: Reliable Event Processing with RabbitMQ

typescript
// services/events.ts — Reliable async processing via message queue
import amqp from "amqplib";

async function setupEventProcessing() {
  const connection = await amqp.connect(process.env.RABBITMQ_URL!);
  const channel = await connection.createChannel();

  // Document events queue
  await channel.assertExchange("doc-events", "topic", { durable: true });
  await channel.assertQueue("doc-notifications", {
    durable: true,
    arguments: { "x-dead-letter-exchange": "dlx" },
  });
  await channel.bindQueue("doc-notifications", "doc-events", "doc.mention.*");
  await channel.bindQueue("doc-notifications", "doc-events", "doc.assign.*");

  // Process notifications reliably
  await channel.prefetch(10);
  channel.consume("doc-notifications", async (msg) => {
    if (!msg) return;
    try {
      const event = JSON.parse(msg.content.toString());
      if (msg.fields.routingKey.startsWith("doc.mention")) {
        await sendMentionNotification(event);        // Email + push
        await broadcastNotification(event.userId, {  // Real-time via Centrifugo
          type: "mention", docId: event.docId, mentionedBy: event.mentionedBy,
        });
      }
      channel.ack(msg);
    } catch (error) {
      channel.nack(msg, false, false);     // To DLX on failure
    }
  });
}

// Publish event when user mentions someone
async function publishMention(docId: string, mentionedBy: string, mentionedUser: string) {
  const channel = await getChannel();
  channel.publish("doc-events", `doc.mention.${mentionedUser}`,
    Buffer.from(JSON.stringify({ docId, mentionedBy, userId: mentionedUser, timestamp: Date.now() })),
    { persistent: true },
  );
}

Step 4: Type-Safe Business Logic with Effect

typescript
// services/document-service.ts — Type-safe operations with error handling
import { Effect, pipe } from "effect";

class DocumentNotFound extends Error { readonly _tag = "DocumentNotFound"; }
class PermissionDenied extends Error { readonly _tag = "PermissionDenied"; }
class ConcurrentEditConflict extends Error { readonly _tag = "ConcurrentEditConflict"; }

function editDocument(docId: string, userId: string, ops: EditOp[], version: number) {
  return Effect.gen(function* () {
    // Check document exists
    const doc = yield* Effect.tryPromise({
      try: () => docRepo.fetch(docId),
      catch: () => new DocumentNotFound(),
    });
    if (!doc) yield* Effect.fail(new DocumentNotFound());

    // Check permissions
    const hasAccess = yield* checkPermission(userId, docId, "edit");
    if (!hasAccess) yield* Effect.fail(new PermissionDenied());

    // Apply operations (with version check)
    const applied = yield* applyOperations(doc, ops, version);

    // Save + broadcast + queue notification (all must succeed)
    yield* Effect.all([
      Effect.tryPromise(() => saveDocument(docId, applied.content, userId)),
      Effect.tryPromise(() => broadcastEdit(docId, { workspaceId: doc.workspaceId, userId, ops, version: applied.version })),
      Effect.tryPromise(() => queueMentionEvents(docId, userId, ops)),
    ], { concurrency: 3 });

    return applied;
  });
}

// Handle errors at the API layer
const handleEdit = pipe(
  editDocument(docId, userId, ops, version),
  Effect.catchTags({
    DocumentNotFound: () => Effect.succeed({ status: 404, error: "Document not found" }),
    PermissionDenied: () => Effect.succeed({ status: 403, error: "No edit permission" }),
    ConcurrentEditConflict: () => Effect.succeed({ status: 409, error: "Version conflict, please retry" }),
  }),
);

Results

After 3 months in production with 200 active teams:

  • Latency: Edit operations broadcast in <50ms via Centrifugo WebSocket; users feel instant sync
  • Reliability: Zero lost edits; RabbitMQ ensures notification delivery even during deployments
  • Search: Full-text document search in <5ms via Redis OM; users find any doc instantly
  • Concurrent editing: 50+ simultaneous editors on a single document; cursor positions update in real-time
  • Notification delivery: 99.8% successful; dead-letter queue catches failures for retry
  • Type safety: Effect catches 12 potential error paths at compile time; zero uncaught exceptions in production
  • Scale: 500 concurrent WebSocket connections on a single Centrifugo instance; horizontal scaling ready