The Problem
Mika leads product at a 30-person developer platform with 45,000 users. They want to add an activity feed — showing what people you follow are building, deploying, and sharing. The naive approach (query all followed users' activities on every page load) takes 2-3 seconds with 500+ follows. Twitter, GitHub, and LinkedIn use "fanout-on-write" — when someone posts, their activity is pushed to all followers' feeds immediately. Reading a feed becomes a single sorted set lookup instead of a complex multi-join query.
Step 1: Build the Fanout Engine
typescript
// src/feed/fanout-engine.ts — Fanout-on-write activity feed with Redis sorted sets
import { Redis } from "ioredis";
import { pool } from "../db";
const redis = new Redis(process.env.REDIS_URL!);
interface Activity {
id: string;
actorId: string;
actorName: string;
actorAvatar: string;
verb: string; // "deployed", "starred", "published", "commented"
objectType: string; // "project", "post", "comment"
objectId: string;
objectTitle: string;
targetType?: string; // "repository", "team"
targetId?: string;
targetTitle?: string;
metadata?: Record<string, any>;
timestamp: number;
}
// Maximum feed size per user (oldest items are trimmed)
const MAX_FEED_SIZE = 500;
// Users with more followers use pull-based (celebrity problem)
const FANOUT_THRESHOLD = 10000;
export async function publishActivity(activity: Activity): Promise<{
fanoutCount: number;
strategy: "fanout" | "lazy";
}> {
const activityStr = JSON.stringify(activity);
// Store the activity in the author's activity list (source of truth)
await redis.zadd(`activities:${activity.actorId}`, activity.timestamp, activityStr);
await redis.zremrangebyrank(`activities:${activity.actorId}`, 0, -MAX_FEED_SIZE - 1);
// Store in database for durability
await pool.query(
`INSERT INTO activities (id, actor_id, verb, object_type, object_id, object_title, target_type, target_id, metadata, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, to_timestamp($10 / 1000.0))`,
[activity.id, activity.actorId, activity.verb, activity.objectType,
activity.objectId, activity.objectTitle, activity.targetType,
activity.targetId, JSON.stringify(activity.metadata || {}), activity.timestamp]
);
// Get follower count to decide fanout strategy
const followerCount = await redis.scard(`followers:${activity.actorId}`);
if (followerCount > FANOUT_THRESHOLD) {
// Celebrity/high-follower accounts: mark as "lazy fanout"
// Followers will pull this user's activities when reading their feed
await redis.sadd("lazy_fanout_users", activity.actorId);
return { fanoutCount: 0, strategy: "lazy" };
}
// Standard fanout: push activity to each follower's feed
const followers = await redis.smembers(`followers:${activity.actorId}`);
const pipeline = redis.pipeline();
for (const followerId of followers) {
pipeline.zadd(`feed:${followerId}`, activity.timestamp, activityStr);
pipeline.zremrangebyrank(`feed:${followerId}`, 0, -MAX_FEED_SIZE - 1);
}
// Also add to the actor's own feed
pipeline.zadd(`feed:${activity.actorId}`, activity.timestamp, activityStr);
await pipeline.exec();
return { fanoutCount: followers.length, strategy: "fanout" };
}
// Read a user's feed with pagination
export async function readFeed(
userId: string,
cursor: number = Date.now(),
limit: number = 20
): Promise<{ items: Activity[]; nextCursor: number | null; hasMore: boolean }> {
// Get fanout feed items
const feedItems = await redis.zrevrangebyscore(
`feed:${userId}`,
cursor - 1, // exclusive of cursor
"-inf",
"LIMIT", 0, limit + 1
);
// Merge with lazy fanout users (celebrities the user follows)
const lazyUsers = await redis.sinter(`following:${userId}`, "lazy_fanout_users");
let allItems: Activity[] = feedItems.map((item) => JSON.parse(item));
if (lazyUsers.length > 0) {
// Pull latest activities from celebrity users
const lazyPipeline = redis.pipeline();
for (const lazyUserId of lazyUsers) {
lazyPipeline.zrevrangebyscore(
`activities:${lazyUserId}`,
cursor - 1, "-inf",
"LIMIT", 0, limit
);
}
const lazyResults = await lazyPipeline.exec();
for (const [err, items] of lazyResults!) {
if (err || !items) continue;
for (const item of items as string[]) {
allItems.push(JSON.parse(item));
}
}
// Sort merged results by timestamp
allItems.sort((a, b) => b.timestamp - a.timestamp);
allItems = allItems.slice(0, limit + 1);
}
// Aggregate similar activities (e.g., "3 people starred your project")
const aggregated = aggregateActivities(allItems.slice(0, limit));
const hasMore = allItems.length > limit;
const nextCursor = hasMore ? allItems[limit - 1].timestamp : null;
return { items: aggregated, nextCursor, hasMore };
}
// Aggregate repeated patterns (e.g., multiple stars on same project)
function aggregateActivities(activities: Activity[]): Activity[] {
const groups = new Map<string, Activity[]>();
for (const activity of activities) {
// Group by verb + object within 1 hour window
const groupKey = `${activity.verb}:${activity.objectId}:${Math.floor(activity.timestamp / 3600000)}`;
if (!groups.has(groupKey)) groups.set(groupKey, []);
groups.get(groupKey)!.push(activity);
}
const result: Activity[] = [];
for (const [_, group] of groups) {
if (group.length > 1) {
// Aggregate: "Alice, Bob, and 3 others starred project-x"
const first = { ...group[0] };
first.metadata = {
...first.metadata,
aggregatedCount: group.length,
aggregatedActors: group.map((a) => ({
id: a.actorId,
name: a.actorName,
avatar: a.actorAvatar,
})),
};
result.push(first);
} else {
result.push(group[0]);
}
}
return result.sort((a, b) => b.timestamp - a.timestamp);
}
// Follow/unfollow management
export async function follow(followerId: string, followeeId: string): Promise<void> {
await redis.sadd(`following:${followerId}`, followeeId);
await redis.sadd(`followers:${followeeId}`, followerId);
// Backfill: add recent activities from the followee to the follower's feed
const recentActivities = await redis.zrevrange(`activities:${followeeId}`, 0, 19, "WITHSCORES");
if (recentActivities.length > 0) {
const pipeline = redis.pipeline();
for (let i = 0; i < recentActivities.length; i += 2) {
pipeline.zadd(`feed:${followerId}`, parseFloat(recentActivities[i + 1]), recentActivities[i]);
}
await pipeline.exec();
}
}
export async function unfollow(followerId: string, followeeId: string): Promise<void> {
await redis.srem(`following:${followerId}`, followeeId);
await redis.srem(`followers:${followeeId}`, followerId);
// Remove unfollowed user's activities from feed (async, best effort)
const feedItems = await redis.zrange(`feed:${followerId}`, 0, -1);
const pipeline = redis.pipeline();
for (const item of feedItems) {
const activity: Activity = JSON.parse(item);
if (activity.actorId === followeeId) {
pipeline.zrem(`feed:${followerId}`, item);
}
}
await pipeline.exec();
}
Step 2: Build the Feed API
typescript
// src/routes/feed.ts — Activity feed REST API with real-time SSE
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { readFeed, publishActivity, follow, unfollow } from "../feed/fanout-engine";
import { Redis } from "ioredis";
const app = new Hono();
// Get user's feed
app.get("/feed", async (c) => {
const userId = c.get("userId");
const cursor = c.req.query("cursor") ? parseInt(c.req.query("cursor")!) : Date.now();
const limit = Math.min(parseInt(c.req.query("limit") || "20"), 50);
const feed = await readFeed(userId, cursor, limit);
return c.json(feed);
});
// Real-time feed updates via SSE
app.get("/feed/stream", async (c) => {
const userId = c.get("userId");
const sub = new Redis(process.env.REDIS_URL!);
return streamSSE(c, async (stream) => {
await sub.subscribe(`feed:realtime:${userId}`);
sub.on("message", async (_, message) => {
await stream.writeSSE({ data: message, event: "activity" });
});
stream.onAbort(() => {
sub.unsubscribe();
sub.disconnect();
});
});
});
// Follow a user
app.post("/follow/:userId", async (c) => {
const followerId = c.get("userId");
const followeeId = c.req.param("userId");
await follow(followerId, followeeId);
return c.json({ following: true });
});
// Unfollow
app.delete("/follow/:userId", async (c) => {
const followerId = c.get("userId");
const followeeId = c.req.param("userId");
await unfollow(followerId, followeeId);
return c.json({ following: false });
});
export default app;
Results
- Feed load time dropped from 2.5s to 8ms — fanout-on-write means reading a feed is a single Redis ZREVRANGE; no joins, no aggregation at read time
- Celebrity problem solved — users with 10K+ followers use lazy fanout (pull-based); their activities are merged at read time, preventing write amplification
- Activity aggregation reduces noise — "5 people starred your project" replaces 5 individual feed items; users see meaningful summaries instead of repetitive entries
- Real-time updates via SSE — new activities appear in the feed without page refresh; engagement metrics show 23% more feed interactions after adding real-time
- Feed backfill on follow — when you follow someone, their recent 20 activities appear in your feed immediately; no "empty feed" problem for new follows