[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build an Event Bus System

Build an Event Bus System

Build an event bus system with pub/sub messaging, event schema registry, dead letter handling, consumer groups, replay capability, and monitoring for decoupled microservice communication.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 5 skills

Avg quality 93/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

redis

v1.0.0

Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.

93/100 quality
1.81× impact
SAFE
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
$

The Problem

Oliver leads backend at a 25-person company with 10 microservices. Services communicate via direct HTTP calls — the order service calls the inventory service, which calls the notification service. When inventory is down, orders fail. Adding a new consumer (analytics) requires changing the order service code. Event format changes break downstream services silently. There's no replay capability — if a consumer was down during an event, the event is lost. They need an event bus: publish once, multiple consumers subscribe, schema validation, dead letter for failures, replay for recovery, and consumer group management.

Step 1: Build the Event Bus

typescript
import { pool } from "../db";
import { Redis } from "ioredis";
import { randomBytes, createHash } from "node:crypto";
import { z, ZodSchema } from "zod";
const redis = new Redis(process.env.REDIS_URL!);

interface Event { id: string; type: string; version: number; source: string; data: any; metadata: { correlationId: string; timestamp: string; }; }
interface Subscription { id: string; eventType: string; consumerGroup: string; handler: (event: Event) => Promise<void>; maxRetries: number; }
interface EventSchema { type: string; version: number; schema: ZodSchema; }

const subscriptions = new Map<string, Subscription[]>();
const schemas = new Map<string, EventSchema>();

// Register event schema
export function registerSchema(type: string, version: number, schema: ZodSchema): void {
  schemas.set(`${type}:${version}`, { type, version, schema });
}

// Publish event
export async function publish(type: string, data: any, source: string, correlationId?: string): Promise<string> {
  const schemaKey = `${type}:1`;
  const schema = schemas.get(schemaKey);
  if (schema) {
    const result = schema.schema.safeParse(data);
    if (!result.success) throw new Error(`Event validation failed: ${result.error.message}`);
  }

  const event: Event = {
    id: `evt-${randomBytes(8).toString("hex")}`, type, version: 1, source,
    data, metadata: { correlationId: correlationId || randomBytes(8).toString("hex"), timestamp: new Date().toISOString() },
  };

  // Store event for replay
  await pool.query(
    `INSERT INTO event_log (id, type, version, source, data, metadata, created_at) VALUES ($1, $2, $3, $4, $5, $6, NOW())`,
    [event.id, type, 1, source, JSON.stringify(data), JSON.stringify(event.metadata)]
  );

  // Publish to Redis for real-time delivery
  await redis.publish(`events:${type}`, JSON.stringify(event));

  // Also push to per-consumer-group queues for reliable delivery
  const subs = subscriptions.get(type) || [];
  const groups = new Set(subs.map((s) => s.consumerGroup));
  for (const group of groups) {
    await redis.rpush(`eventbus:${type}:${group}`, JSON.stringify(event));
  }

  await redis.hincrby("eventbus:stats", "published", 1);
  await redis.hincrby(`eventbus:stats:${type}`, "count", 1);

  return event.id;
}

// Subscribe to events
export function subscribe(eventType: string, consumerGroup: string, handler: (event: Event) => Promise<void>, maxRetries: number = 3): string {
  const id = `sub-${randomBytes(4).toString("hex")}`;
  const sub: Subscription = { id, eventType, consumerGroup, handler, maxRetries };
  if (!subscriptions.has(eventType)) subscriptions.set(eventType, []);
  subscriptions.get(eventType)!.push(sub);
  // Start consumer loop
  consumeLoop(eventType, consumerGroup, sub).catch(() => {});
  return id;
}

async function consumeLoop(eventType: string, group: string, sub: Subscription): Promise<void> {
  while (true) {
    const raw = await redis.blpop(`eventbus:${eventType}:${group}`, 5);
    if (!raw) continue;
    const event: Event = JSON.parse(raw[1]);

    let attempt = 0;
    let success = false;
    while (attempt <= sub.maxRetries && !success) {
      try {
        await sub.handler(event);
        success = true;
        await redis.hincrby("eventbus:stats", "consumed", 1);
      } catch (e: any) {
        attempt++;
        if (attempt > sub.maxRetries) {
          // Dead letter
          await redis.rpush(`eventbus:dlq:${eventType}:${group}`, JSON.stringify({ event, error: e.message, failedAt: new Date().toISOString(), attempts: attempt }));
          await redis.hincrby("eventbus:stats", "dead", 1);
        } else {
          await new Promise((r) => setTimeout(r, 1000 * Math.pow(2, attempt)));
        }
      }
    }
  }
}

// Replay events (e.g., after consumer recovery)
export async function replay(eventType: string, consumerGroup: string, since: string): Promise<number> {
  const { rows } = await pool.query(
    "SELECT * FROM event_log WHERE type = $1 AND created_at >= $2 ORDER BY created_at ASC",
    [eventType, since]
  );
  for (const row of rows) {
    await redis.rpush(`eventbus:${eventType}:${consumerGroup}`, JSON.stringify({
      ...row, data: JSON.parse(row.data), metadata: JSON.parse(row.metadata),
    }));
  }
  return rows.length;
}

// Retry dead letter events
export async function retryDLQ(eventType: string, consumerGroup: string): Promise<number> {
  let retried = 0;
  while (true) {
    const raw = await redis.lpop(`eventbus:dlq:${eventType}:${consumerGroup}`);
    if (!raw) break;
    const { event } = JSON.parse(raw);
    await redis.rpush(`eventbus:${eventType}:${consumerGroup}`, JSON.stringify(event));
    retried++;
  }
  return retried;
}

// Monitoring
export async function getStats(): Promise<{ published: number; consumed: number; dead: number; queueDepths: Record<string, number> }> {
  const stats = await redis.hgetall("eventbus:stats");
  const queueKeys = await redis.keys("eventbus:*:*");
  const depths: Record<string, number> = {};
  for (const key of queueKeys) {
    if (!key.includes("dlq") && !key.includes("stats")) {
      depths[key] = await redis.llen(key);
    }
  }
  return { published: parseInt(stats.published || "0"), consumed: parseInt(stats.consumed || "0"), dead: parseInt(stats.dead || "0"), queueDepths: depths };
}

Results

  • Inventory down ≠ orders fail — order service publishes order.created event; inventory consumes when it's back up; decoupled; no cascading failures
  • New consumer in 5 minutes — analytics team subscribes to order.created; no changes to order service; zero coordination needed
  • Schema validationorder.created must have orderId, amount, currency; publisher with missing field gets immediate error; no silent downstream failures
  • Event replay — analytics consumer was down for 2 hours; replay('order.created', 'analytics', '2h ago') re-queues 500 missed events; zero data loss
  • Dead letter queue — notification service fails on malformed email; event moves to DLQ after 3 retries; ops fixes handler, retries DLQ; no lost notifications