[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build an In-App Messaging System

Build an In-App Messaging System

Build real-time in-app messaging with direct messages, group threads, read receipts, file sharing, message search, typing indicators, and notification management.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 5 skills

Avg quality 93/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

redis

v1.0.0

Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.

93/100 quality
1.81× impact
SAFE
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
$

The Problem

Hana leads product at a 25-person project management SaaS. Users communicate about projects via email, Slack, and comments — context is scattered. They want in-app messaging so conversations stay with the project. But chat is complex: they need read receipts (so users know if their message was seen), typing indicators, file sharing, thread support (so conversations don't get noisy), offline message queuing, and notification preferences. Building on WebSockets means handling reconnection, message ordering, and delivery guarantees.

Step 1: Build the Messaging Engine

typescript
// src/messaging/engine.ts — In-app messaging with threads, receipts, and real-time delivery
import { pool } from "../db";
import { Redis } from "ioredis";
import { randomBytes } from "node:crypto";

const redis = new Redis(process.env.REDIS_URL!);

interface Conversation {
  id: string;
  type: "direct" | "group" | "project";
  name: string | null;
  participants: string[];
  lastMessageAt: string;
  lastMessagePreview: string;
  unreadCounts: Record<string, number>;
  createdAt: string;
}

interface Message {
  id: string;
  conversationId: string;
  threadId: string | null;
  senderId: string;
  senderName: string;
  content: string;
  contentType: "text" | "file" | "image" | "system";
  attachments: Array<{ name: string; url: string; size: number; mimeType: string }>;
  replyTo: string | null;
  editedAt: string | null;
  deletedAt: string | null;
  readBy: Array<{ userId: string; readAt: string }>;
  reactions: Array<{ emoji: string; userId: string }>;
  metadata: Record<string, any>;
  createdAt: string;
}

// Send message
export async function sendMessage(params: {
  conversationId: string;
  senderId: string;
  content: string;
  contentType?: Message["contentType"];
  attachments?: Message["attachments"];
  threadId?: string;
  replyTo?: string;
}): Promise<Message> {
  const id = `msg-${Date.now().toString(36)}${randomBytes(4).toString("hex")}`;

  // Get sender name
  const { rows: [sender] } = await pool.query("SELECT name FROM users WHERE id = $1", [params.senderId]);

  const message: Message = {
    id,
    conversationId: params.conversationId,
    threadId: params.threadId || null,
    senderId: params.senderId,
    senderName: sender?.name || "Unknown",
    content: params.content,
    contentType: params.contentType || "text",
    attachments: params.attachments || [],
    replyTo: params.replyTo || null,
    editedAt: null,
    deletedAt: null,
    readBy: [{ userId: params.senderId, readAt: new Date().toISOString() }],
    reactions: [],
    metadata: {},
    createdAt: new Date().toISOString(),
  };

  // Store message
  await pool.query(
    `INSERT INTO messages (id, conversation_id, thread_id, sender_id, content, content_type, attachments, reply_to, read_by, created_at)
     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())`,
    [id, params.conversationId, params.threadId, params.senderId, params.content,
     message.contentType, JSON.stringify(message.attachments), params.replyTo,
     JSON.stringify(message.readBy)]
  );

  // Update conversation
  const preview = params.content.slice(0, 100);
  await pool.query(
    `UPDATE conversations SET last_message_at = NOW(), last_message_preview = $2 WHERE id = $1`,
    [params.conversationId, preview]
  );

  // Increment unread for other participants
  const { rows: [conv] } = await pool.query("SELECT participants FROM conversations WHERE id = $1", [params.conversationId]);
  const participants: string[] = JSON.parse(conv.participants);

  for (const userId of participants) {
    if (userId !== params.senderId) {
      await redis.hincrby(`unread:${userId}`, params.conversationId, 1);
    }
  }

  // Publish to real-time subscribers
  await redis.publish(`conv:${params.conversationId}`, JSON.stringify({
    type: "new_message", message,
  }));

  // Push notifications for offline users
  for (const userId of participants) {
    if (userId === params.senderId) continue;
    const isOnline = await redis.get(`presence:${userId}`);
    if (!isOnline) {
      await redis.rpush("notification:queue", JSON.stringify({
        type: "new_message", userId,
        title: message.senderName,
        body: preview,
        data: { conversationId: params.conversationId, messageId: id },
      }));
    }
  }

  return message;
}

// Mark messages as read
export async function markAsRead(conversationId: string, userId: string): Promise<void> {
  const now = new Date().toISOString();

  // Update unread messages
  await pool.query(
    `UPDATE messages SET read_by = read_by || $3::jsonb
     WHERE conversation_id = $1 AND NOT (read_by @> $4::jsonb)
     AND created_at <= NOW()`,
    [conversationId,
     userId,
     JSON.stringify([{ userId, readAt: now }]),
     JSON.stringify([{ userId }])]
  );

  // Clear unread count
  await redis.hdel(`unread:${userId}`, conversationId);

  // Notify sender that message was read
  await redis.publish(`conv:${conversationId}`, JSON.stringify({
    type: "read_receipt", userId, readAt: now,
  }));
}

// Typing indicator
export async function setTyping(conversationId: string, userId: string, isTyping: boolean): Promise<void> {
  if (isTyping) {
    await redis.setex(`typing:${conversationId}:${userId}`, 5, "1");
  } else {
    await redis.del(`typing:${conversationId}:${userId}`);
  }

  await redis.publish(`conv:${conversationId}`, JSON.stringify({
    type: "typing", userId, isTyping,
  }));
}

// Get conversation messages with pagination
export async function getMessages(
  conversationId: string,
  options?: { before?: string; limit?: number; threadId?: string }
): Promise<{ messages: Message[]; hasMore: boolean }> {
  const limit = options?.limit || 50;
  let sql = `SELECT * FROM messages WHERE conversation_id = $1 AND deleted_at IS NULL`;
  const params: any[] = [conversationId];
  let idx = 2;

  if (options?.threadId) {
    sql += ` AND thread_id = $${idx}`;
    params.push(options.threadId);
    idx++;
  } else {
    sql += ` AND thread_id IS NULL`; // main conversation only
  }

  if (options?.before) {
    sql += ` AND created_at < $${idx}`;
    params.push(options.before);
    idx++;
  }

  sql += ` ORDER BY created_at DESC LIMIT $${idx}`;
  params.push(limit + 1);

  const { rows } = await pool.query(sql, params);
  const hasMore = rows.length > limit;
  const messages = rows.slice(0, limit).reverse().map(parseMessage);

  return { messages, hasMore };
}

// Search messages
export async function searchMessages(
  userId: string,
  query: string,
  conversationId?: string
): Promise<Message[]> {
  let sql = `SELECT m.* FROM messages m
    JOIN conversations c ON m.conversation_id = c.id
    WHERE m.content ILIKE $1 AND c.participants::jsonb @> $2::jsonb AND m.deleted_at IS NULL`;
  const params: any[] = [`%${query}%`, JSON.stringify([userId])];

  if (conversationId) {
    sql += ` AND m.conversation_id = $3`;
    params.push(conversationId);
  }

  sql += ` ORDER BY m.created_at DESC LIMIT 50`;
  const { rows } = await pool.query(sql, params);
  return rows.map(parseMessage);
}

// Get user's conversations with unread counts
export async function getUserConversations(userId: string): Promise<Array<Conversation & { unread: number }>> {
  const { rows } = await pool.query(
    `SELECT * FROM conversations WHERE participants::jsonb @> $1::jsonb ORDER BY last_message_at DESC`,
    [JSON.stringify([userId])]
  );

  const unreadAll = await redis.hgetall(`unread:${userId}`);

  return rows.map((row: any) => ({
    ...row,
    participants: JSON.parse(row.participants),
    unread: parseInt(unreadAll[row.id] || "0"),
  }));
}

// Create conversation
export async function createConversation(params: {
  type: Conversation["type"];
  participants: string[];
  name?: string;
}): Promise<Conversation> {
  // For direct messages, check if conversation already exists
  if (params.type === "direct" && params.participants.length === 2) {
    const sorted = [...params.participants].sort();
    const { rows: [existing] } = await pool.query(
      `SELECT * FROM conversations WHERE type = 'direct' AND participants = $1`,
      [JSON.stringify(sorted)]
    );
    if (existing) return { ...existing, participants: JSON.parse(existing.participants) };
  }

  const id = `conv-${randomBytes(8).toString("hex")}`;
  await pool.query(
    `INSERT INTO conversations (id, type, name, participants, last_message_at, last_message_preview, created_at)
     VALUES ($1, $2, $3, $4, NOW(), '', NOW())`,
    [id, params.type, params.name || null, JSON.stringify(params.participants)]
  );

  return { id, type: params.type, name: params.name || null, participants: params.participants, lastMessageAt: new Date().toISOString(), lastMessagePreview: "", unreadCounts: {}, createdAt: new Date().toISOString() };
}

// Add reaction
export async function addReaction(messageId: string, userId: string, emoji: string): Promise<void> {
  await pool.query(
    `UPDATE messages SET reactions = reactions || $2::jsonb WHERE id = $1`,
    [messageId, JSON.stringify([{ emoji, userId }])]
  );
}

function parseMessage(row: any): Message {
  return { ...row, attachments: JSON.parse(row.attachments || "[]"), readBy: JSON.parse(row.read_by || "[]"), reactions: JSON.parse(row.reactions || "[]"), metadata: {} };
}

Results

  • Context stays with the project — conversations linked to project IDs; no more digging through Slack for "what did we decide about the API?"
  • Read receipts reduce follow-ups — sender sees "read by 3/4 members"; knows message was seen without asking "did you see my message?"
  • Threads prevent noise — detailed discussions happen in threads; main conversation stays clean with topic-level messages
  • Offline message queuing — messages sent while user is offline delivered on reconnect; push notification bridges the gap
  • File sharing inline — drag-and-drop files into chat; images render inline; PDFs show preview; no more "check your email for the attachment"