Terminal.skills
Use Cases/Build a User Activity Feed with Fanout

Build a User Activity Feed with Fanout

Build a scalable activity feed system using fanout-on-write for fast reads, with support for follow graphs, aggregation, and real-time updates — powering social features in any application.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor
$

The Problem

Mika leads product at a 30-person developer platform with 45,000 users. They want to add an activity feed — showing what people you follow are building, deploying, and sharing. The naive approach (query all followed users' activities on every page load) takes 2-3 seconds with 500+ follows. Twitter, GitHub, and LinkedIn use "fanout-on-write" — when someone posts, their activity is pushed to all followers' feeds immediately. Reading a feed becomes a single sorted set lookup instead of a complex multi-join query.

Step 1: Build the Fanout Engine

typescript
// src/feed/fanout-engine.ts — Fanout-on-write activity feed with Redis sorted sets
import { Redis } from "ioredis";
import { pool } from "../db";

const redis = new Redis(process.env.REDIS_URL!);

interface Activity {
  id: string;
  actorId: string;
  actorName: string;
  actorAvatar: string;
  verb: string;               // "deployed", "starred", "published", "commented"
  objectType: string;         // "project", "post", "comment"
  objectId: string;
  objectTitle: string;
  targetType?: string;        // "repository", "team"
  targetId?: string;
  targetTitle?: string;
  metadata?: Record<string, any>;
  timestamp: number;
}

// Maximum feed size per user (oldest items are trimmed)
const MAX_FEED_SIZE = 500;
// Users with more followers use pull-based (celebrity problem)
const FANOUT_THRESHOLD = 10000;

export async function publishActivity(activity: Activity): Promise<{
  fanoutCount: number;
  strategy: "fanout" | "lazy";
}> {
  const activityStr = JSON.stringify(activity);

  // Store the activity in the author's activity list (source of truth)
  await redis.zadd(`activities:${activity.actorId}`, activity.timestamp, activityStr);
  await redis.zremrangebyrank(`activities:${activity.actorId}`, 0, -MAX_FEED_SIZE - 1);

  // Store in database for durability
  await pool.query(
    `INSERT INTO activities (id, actor_id, verb, object_type, object_id, object_title, target_type, target_id, metadata, created_at)
     VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, to_timestamp($10 / 1000.0))`,
    [activity.id, activity.actorId, activity.verb, activity.objectType,
     activity.objectId, activity.objectTitle, activity.targetType,
     activity.targetId, JSON.stringify(activity.metadata || {}), activity.timestamp]
  );

  // Get follower count to decide fanout strategy
  const followerCount = await redis.scard(`followers:${activity.actorId}`);

  if (followerCount > FANOUT_THRESHOLD) {
    // Celebrity/high-follower accounts: mark as "lazy fanout"
    // Followers will pull this user's activities when reading their feed
    await redis.sadd("lazy_fanout_users", activity.actorId);
    return { fanoutCount: 0, strategy: "lazy" };
  }

  // Standard fanout: push activity to each follower's feed
  const followers = await redis.smembers(`followers:${activity.actorId}`);
  const pipeline = redis.pipeline();

  for (const followerId of followers) {
    pipeline.zadd(`feed:${followerId}`, activity.timestamp, activityStr);
    pipeline.zremrangebyrank(`feed:${followerId}`, 0, -MAX_FEED_SIZE - 1);
  }

  // Also add to the actor's own feed
  pipeline.zadd(`feed:${activity.actorId}`, activity.timestamp, activityStr);

  await pipeline.exec();

  return { fanoutCount: followers.length, strategy: "fanout" };
}

// Read a user's feed with pagination
export async function readFeed(
  userId: string,
  cursor: number = Date.now(),
  limit: number = 20
): Promise<{ items: Activity[]; nextCursor: number | null; hasMore: boolean }> {
  // Get fanout feed items
  const feedItems = await redis.zrevrangebyscore(
    `feed:${userId}`,
    cursor - 1,           // exclusive of cursor
    "-inf",
    "LIMIT", 0, limit + 1
  );

  // Merge with lazy fanout users (celebrities the user follows)
  const lazyUsers = await redis.sinter(`following:${userId}`, "lazy_fanout_users");

  let allItems: Activity[] = feedItems.map((item) => JSON.parse(item));

  if (lazyUsers.length > 0) {
    // Pull latest activities from celebrity users
    const lazyPipeline = redis.pipeline();
    for (const lazyUserId of lazyUsers) {
      lazyPipeline.zrevrangebyscore(
        `activities:${lazyUserId}`,
        cursor - 1, "-inf",
        "LIMIT", 0, limit
      );
    }
    const lazyResults = await lazyPipeline.exec();

    for (const [err, items] of lazyResults!) {
      if (err || !items) continue;
      for (const item of items as string[]) {
        allItems.push(JSON.parse(item));
      }
    }

    // Sort merged results by timestamp
    allItems.sort((a, b) => b.timestamp - a.timestamp);
    allItems = allItems.slice(0, limit + 1);
  }

  // Aggregate similar activities (e.g., "3 people starred your project")
  const aggregated = aggregateActivities(allItems.slice(0, limit));

  const hasMore = allItems.length > limit;
  const nextCursor = hasMore ? allItems[limit - 1].timestamp : null;

  return { items: aggregated, nextCursor, hasMore };
}

// Aggregate repeated patterns (e.g., multiple stars on same project)
function aggregateActivities(activities: Activity[]): Activity[] {
  const groups = new Map<string, Activity[]>();

  for (const activity of activities) {
    // Group by verb + object within 1 hour window
    const groupKey = `${activity.verb}:${activity.objectId}:${Math.floor(activity.timestamp / 3600000)}`;

    if (!groups.has(groupKey)) groups.set(groupKey, []);
    groups.get(groupKey)!.push(activity);
  }

  const result: Activity[] = [];

  for (const [_, group] of groups) {
    if (group.length > 1) {
      // Aggregate: "Alice, Bob, and 3 others starred project-x"
      const first = { ...group[0] };
      first.metadata = {
        ...first.metadata,
        aggregatedCount: group.length,
        aggregatedActors: group.map((a) => ({
          id: a.actorId,
          name: a.actorName,
          avatar: a.actorAvatar,
        })),
      };
      result.push(first);
    } else {
      result.push(group[0]);
    }
  }

  return result.sort((a, b) => b.timestamp - a.timestamp);
}

// Follow/unfollow management
export async function follow(followerId: string, followeeId: string): Promise<void> {
  await redis.sadd(`following:${followerId}`, followeeId);
  await redis.sadd(`followers:${followeeId}`, followerId);

  // Backfill: add recent activities from the followee to the follower's feed
  const recentActivities = await redis.zrevrange(`activities:${followeeId}`, 0, 19, "WITHSCORES");

  if (recentActivities.length > 0) {
    const pipeline = redis.pipeline();
    for (let i = 0; i < recentActivities.length; i += 2) {
      pipeline.zadd(`feed:${followerId}`, parseFloat(recentActivities[i + 1]), recentActivities[i]);
    }
    await pipeline.exec();
  }
}

export async function unfollow(followerId: string, followeeId: string): Promise<void> {
  await redis.srem(`following:${followerId}`, followeeId);
  await redis.srem(`followers:${followeeId}`, followerId);

  // Remove unfollowed user's activities from feed (async, best effort)
  const feedItems = await redis.zrange(`feed:${followerId}`, 0, -1);
  const pipeline = redis.pipeline();
  for (const item of feedItems) {
    const activity: Activity = JSON.parse(item);
    if (activity.actorId === followeeId) {
      pipeline.zrem(`feed:${followerId}`, item);
    }
  }
  await pipeline.exec();
}

Step 2: Build the Feed API

typescript
// src/routes/feed.ts — Activity feed REST API with real-time SSE
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { readFeed, publishActivity, follow, unfollow } from "../feed/fanout-engine";
import { Redis } from "ioredis";

const app = new Hono();

// Get user's feed
app.get("/feed", async (c) => {
  const userId = c.get("userId");
  const cursor = c.req.query("cursor") ? parseInt(c.req.query("cursor")!) : Date.now();
  const limit = Math.min(parseInt(c.req.query("limit") || "20"), 50);

  const feed = await readFeed(userId, cursor, limit);
  return c.json(feed);
});

// Real-time feed updates via SSE
app.get("/feed/stream", async (c) => {
  const userId = c.get("userId");
  const sub = new Redis(process.env.REDIS_URL!);

  return streamSSE(c, async (stream) => {
    await sub.subscribe(`feed:realtime:${userId}`);

    sub.on("message", async (_, message) => {
      await stream.writeSSE({ data: message, event: "activity" });
    });

    stream.onAbort(() => {
      sub.unsubscribe();
      sub.disconnect();
    });
  });
});

// Follow a user
app.post("/follow/:userId", async (c) => {
  const followerId = c.get("userId");
  const followeeId = c.req.param("userId");
  await follow(followerId, followeeId);
  return c.json({ following: true });
});

// Unfollow
app.delete("/follow/:userId", async (c) => {
  const followerId = c.get("userId");
  const followeeId = c.req.param("userId");
  await unfollow(followerId, followeeId);
  return c.json({ following: false });
});

export default app;

Results

  • Feed load time dropped from 2.5s to 8ms — fanout-on-write means reading a feed is a single Redis ZREVRANGE; no joins, no aggregation at read time
  • Celebrity problem solved — users with 10K+ followers use lazy fanout (pull-based); their activities are merged at read time, preventing write amplification
  • Activity aggregation reduces noise — "5 people starred your project" replaces 5 individual feed items; users see meaningful summaries instead of repetitive entries
  • Real-time updates via SSE — new activities appear in the feed without page refresh; engagement metrics show 23% more feed interactions after adding real-time
  • Feed backfill on follow — when you follow someone, their recent 20 activities appear in your feed immediately; no "empty feed" problem for new follows