[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build an ML Feature Store for Production Recommendations

Build an ML Feature Store for Production Recommendations

Stop recomputing features on every request. Build a feature store that serves pre-computed ML features in under 5ms, handles feature drift monitoring, and eliminates training-serving skew.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 7 skills

Avg quality 91/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

redis

v1.0.0

Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.

93/100 quality
1.81× impact
SAFE
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

kafka-js

v

Not yet scored
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
>

vitest

v1.0.0

You are an expert in Vitest, the Vite-native testing framework. You help developers write and run unit tests, integration tests, and component tests with native TypeScript support, Jest-compatible API, built-in mocking, code coverage, snapshot testing, and watch mode — leveraging Vite's transform pipeline for instant test execution without separate compilation.

80/100 quality
1.64× impact
SAFE
View skill
$

The Problem

Carlos leads ML engineering at a media streaming platform with 2M active users. Their recommendation model is good in notebooks but terrible in production. The problem: features computed during training (7-day watch history, genre preferences, time-of-day patterns) are recomputed on every API request using raw SQL queries, adding 800ms of latency. Worse, the training pipeline computes features differently than the serving code — "training-serving skew" means the model in production is effectively running on different data than what it was trained on. Last month, a seemingly small change to the feature computation logic in the serving path caused a 23% drop in click-through rate before anyone noticed.

Carlos needs:

  • Single feature definition used in both training and serving — eliminates skew
  • Pre-computed features served from Redis in <5ms, not computed on-the-fly
  • Real-time feature updates — when a user watches something, their features update within seconds
  • Feature versioning — roll back to previous feature definitions without retraining
  • Drift monitoring — detect when feature distributions shift from training data
  • Point-in-time correctness — training features reflect what was known at that time, not the future

Step 1: Feature Definitions as Code

Define features once, use everywhere. Each feature has a computation function, a storage key, and metadata for monitoring.

typescript
// src/features/definitions.ts
// Single source of truth for feature computations

import { z } from 'zod';

export const FeatureType = z.enum(['numeric', 'categorical', 'embedding', 'list']);

export const FeatureDefinition = z.object({
  name: z.string().regex(/^[a-z][a-z0-9_]*$/),
  version: z.number().int().positive(),
  type: FeatureType,
  description: z.string(),
  entity: z.enum(['user', 'item', 'user_item']),  // what the feature describes
  freshness: z.enum(['realtime', 'hourly', 'daily']),
  defaultValue: z.unknown(),
  // Expected distribution for drift detection
  expectedStats: z.object({
    mean: z.number().optional(),
    stddev: z.number().optional(),
    min: z.number().optional(),
    max: z.number().optional(),
    topCategories: z.array(z.string()).optional(),
  }).optional(),
});

export type FeatureDefinition = z.infer<typeof FeatureDefinition>;

export const featureRegistry: FeatureDefinition[] = [
  {
    name: 'user_watch_count_7d',
    version: 2,
    type: 'numeric',
    description: 'Number of videos watched in the last 7 days',
    entity: 'user',
    freshness: 'realtime',
    defaultValue: 0,
    expectedStats: { mean: 12.5, stddev: 8.3, min: 0, max: 200 },
  },
  {
    name: 'user_genre_preferences',
    version: 1,
    type: 'embedding',
    description: '32-dim vector of genre affinity scores based on watch history',
    entity: 'user',
    freshness: 'hourly',
    defaultValue: new Array(32).fill(0),
  },
  {
    name: 'user_avg_watch_duration_min',
    version: 1,
    type: 'numeric',
    description: 'Average watch duration in minutes over last 30 days',
    entity: 'user',
    freshness: 'daily',
    defaultValue: 0,
  },
  {
    name: 'user_preferred_hour',
    version: 1,
    type: 'numeric',
    description: 'Hour of day (0-23) when user most often watches',
    entity: 'user',
    freshness: 'daily',
    defaultValue: 20,  // 8 PM default
  },
  {
    name: 'item_popularity_score',
    version: 1,
    type: 'numeric',
    description: 'Normalized popularity score (0-1) based on recent views',
    entity: 'item',
    freshness: 'hourly',
    defaultValue: 0,
    expectedStats: { mean: 0.15, stddev: 0.2, min: 0, max: 1 },
  },
  {
    name: 'item_genre',
    version: 1,
    type: 'categorical',
    description: 'Primary genre of the content',
    entity: 'item',
    freshness: 'daily',
    defaultValue: 'unknown',
    expectedStats: { topCategories: ['drama', 'comedy', 'action', 'documentary', 'thriller'] },
  },
  {
    name: 'user_item_watch_progress',
    version: 1,
    type: 'numeric',
    description: 'How much of this item the user has watched (0-1)',
    entity: 'user_item',
    freshness: 'realtime',
    defaultValue: 0,
  },
];

Step 2: Feature Computation Pipeline

typescript
// src/features/compute.ts
// Computes features from raw data — used by both batch and streaming pipelines

import { Pool } from 'pg';

const db = new Pool({ connectionString: process.env.DATABASE_URL });

// These functions are the SINGLE source of truth for feature computation.
// Training pipeline and serving pipeline both call these.
// Never duplicate this logic.

export async function computeUserWatchCount7d(userId: string): Promise<number> {
  const result = await db.query(
    `SELECT COUNT(*) as count FROM watch_events
     WHERE user_id = $1 AND watched_at > NOW() - INTERVAL '7 days'`,
    [userId]
  );
  return parseInt(result.rows[0].count);
}

export async function computeUserGenrePreferences(userId: string): Promise<number[]> {
  // Weighted average of genre embeddings based on watch history
  const result = await db.query(
    `SELECT g.embedding, w.watch_duration_min
     FROM watch_events w
     JOIN items i ON w.item_id = i.id
     JOIN genres g ON i.genre_id = g.id
     WHERE w.user_id = $1 AND w.watched_at > NOW() - INTERVAL '30 days'
     ORDER BY w.watched_at DESC
     LIMIT 100`,
    [userId]
  );

  if (!result.rows.length) return new Array(32).fill(0);

  // Time-weighted average: recent watches count more
  const totalDuration = result.rows.reduce((s, r) => s + r.watch_duration_min, 0);
  const embedding = new Array(32).fill(0);

  for (const row of result.rows) {
    const weight = row.watch_duration_min / totalDuration;
    const emb = row.embedding as number[];
    for (let i = 0; i < 32; i++) {
      embedding[i] += emb[i] * weight;
    }
  }

  return embedding;
}

export async function computeItemPopularity(itemId: string): Promise<number> {
  // Exponential decay: recent views count more
  const result = await db.query(
    `SELECT SUM(
       EXP(-0.1 * EXTRACT(EPOCH FROM (NOW() - watched_at)) / 86400)
     ) as score
     FROM watch_events
     WHERE item_id = $1 AND watched_at > NOW() - INTERVAL '14 days'`,
    [itemId]
  );

  const raw = parseFloat(result.rows[0].score ?? '0');
  // Normalize to 0-1 (calibrated from historical max)
  return Math.min(1, raw / 1000);
}

Step 3: Feature Store (Write + Read)

typescript
// src/store/feature-store.ts
// Writes features to Redis, reads with sub-5ms latency

import { Redis } from 'ioredis';
import { featureRegistry, type FeatureDefinition } from '../features/definitions';

const redis = new Redis(process.env.REDIS_URL!);

// Key format: fs:{entity}:{entityId}:{featureName}:v{version}
function featureKey(entity: string, entityId: string, feature: FeatureDefinition): string {
  return `fs:${entity}:${entityId}:${feature.name}:v${feature.version}`;
}

export async function writeFeature(
  entityId: string,
  featureName: string,
  value: unknown
): Promise<void> {
  const feature = featureRegistry.find(f => f.name === featureName);
  if (!feature) throw new Error(`Unknown feature: ${featureName}`);

  const key = featureKey(feature.entity, entityId, feature);
  const ttl = freshnessTTL[feature.freshness];

  await redis.setex(key, ttl, JSON.stringify(value));

  // Also write to the "latest" alias (no version) for serving
  const latestKey = `fs:${feature.entity}:${entityId}:${featureName}:latest`;
  await redis.setex(latestKey, ttl, JSON.stringify(value));
}

export async function readFeatures(
  entity: string,
  entityId: string,
  featureNames: string[]
): Promise<Record<string, unknown>> {
  const keys = featureNames.map(name => {
    return `fs:${entity}:${entityId}:${name}:latest`;
  });

  // Single MGET call — one round trip for all features
  const values = await redis.mget(...keys);

  const result: Record<string, unknown> = {};
  for (let i = 0; i < featureNames.length; i++) {
    const feature = featureRegistry.find(f => f.name === featureNames[i]);
    if (values[i] !== null) {
      result[featureNames[i]] = JSON.parse(values[i]!);
    } else {
      // Use default value if not in store
      result[featureNames[i]] = feature?.defaultValue ?? null;
    }
  }

  return result;
}

// Bulk read for training data export
export async function readFeatureVector(
  entity: string,
  entityId: string
): Promise<Record<string, unknown>> {
  const features = featureRegistry.filter(f => f.entity === entity);
  return readFeatures(entity, entityId, features.map(f => f.name));
}

const freshnessTTL: Record<string, number> = {
  realtime: 300,      // 5 min — will be refreshed sooner by streaming
  hourly: 7_200,      // 2 hours
  daily: 172_800,     // 2 days
};

Step 4: Real-Time Feature Updates via Kafka

When a user watches something, their features update in seconds — not the next batch run.

typescript
// src/pipeline/streaming.ts
// Kafka consumer that updates features in real time

import { Kafka } from 'kafkajs';
import { writeFeature } from '../store/feature-store';
import { computeUserWatchCount7d, computeUserGenrePreferences } from '../features/compute';

const kafka = new Kafka({
  clientId: 'feature-store-updater',
  brokers: process.env.KAFKA_BROKERS?.split(',') ?? ['localhost:9092'],
});

const consumer = kafka.consumer({ groupId: 'feature-updates' });

export async function startStreamingUpdates(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({ topic: 'watch-events', fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value!.toString()) as {
        userId: string;
        itemId: string;
        watchDurationMin: number;
        progress: number;  // 0-1
      };

      // Update real-time features
      await Promise.all([
        // Recompute watch count (fast — single COUNT query)
        computeUserWatchCount7d(event.userId)
          .then(count => writeFeature(event.userId, 'user_watch_count_7d', count)),

        // Update watch progress for this user-item pair
        writeFeature(
          `${event.userId}:${event.itemId}`,
          'user_item_watch_progress',
          event.progress
        ),
      ]);

      // Genre preferences are hourly — don't recompute on every event
      // (handled by the batch pipeline)
    },
  });
}

Step 5: Feature Drift Monitor

Detect when production feature distributions diverge from training data.

typescript
// src/monitoring/drift-detector.ts
// Compares live feature distributions against training baselines

import { Redis } from 'ioredis';
import { featureRegistry } from '../features/definitions';

const redis = new Redis(process.env.REDIS_URL!);

interface DriftReport {
  featureName: string;
  metric: string;
  trainingValue: number;
  currentValue: number;
  driftPercent: number;
  alert: boolean;
}

export async function checkFeatureDrift(): Promise<DriftReport[]> {
  const reports: DriftReport[] = [];

  for (const feature of featureRegistry) {
    if (feature.type !== 'numeric' || !feature.expectedStats) continue;

    // Sample recent feature values
    const sampleKey = `fs:samples:${feature.name}`;
    const samples = await redis.lrange(sampleKey, 0, 999);
    if (samples.length < 100) continue;  // not enough data

    const values = samples.map(Number);
    const currentMean = values.reduce((a, b) => a + b, 0) / values.length;
    const currentStddev = Math.sqrt(
      values.reduce((sum, v) => sum + (v - currentMean) ** 2, 0) / values.length
    );

    // Check mean drift
    if (feature.expectedStats.mean !== undefined) {
      const driftPercent = Math.abs(
        (currentMean - feature.expectedStats.mean) / feature.expectedStats.mean
      ) * 100;

      reports.push({
        featureName: feature.name,
        metric: 'mean',
        trainingValue: feature.expectedStats.mean,
        currentValue: currentMean,
        driftPercent,
        alert: driftPercent > 20,  // >20% drift triggers alert
      });
    }

    // Check stddev drift (distribution shape change)
    if (feature.expectedStats.stddev !== undefined) {
      const driftPercent = Math.abs(
        (currentStddev - feature.expectedStats.stddev) / feature.expectedStats.stddev
      ) * 100;

      reports.push({
        featureName: feature.name,
        metric: 'stddev',
        trainingValue: feature.expectedStats.stddev,
        currentValue: currentStddev,
        driftPercent,
        alert: driftPercent > 30,
      });
    }
  }

  return reports;
}

// Record feature values for drift monitoring
export async function sampleFeatureValue(
  featureName: string,
  value: number
): Promise<void> {
  const key = `fs:samples:${featureName}`;
  await redis.lpush(key, value);
  await redis.ltrim(key, 0, 9999);  // keep last 10K samples
  await redis.expire(key, 86400);    // 24h TTL
}

Step 6: Serving API

typescript
// src/api/features.ts
// HTTP API for model serving — returns feature vectors in <5ms

import { Hono } from 'hono';
import { readFeatures } from '../store/feature-store';
import { sampleFeatureValue } from '../monitoring/drift-detector';

const app = new Hono();

// Get features for recommendation model
app.get('/v1/features/user/:userId', async (c) => {
  const userId = c.req.param('userId');
  const start = Date.now();

  const features = await readFeatures('user', userId, [
    'user_watch_count_7d',
    'user_genre_preferences',
    'user_avg_watch_duration_min',
    'user_preferred_hour',
  ]);

  // Sample for drift monitoring (1% of requests)
  if (Math.random() < 0.01) {
    const watchCount = features.user_watch_count_7d as number;
    if (typeof watchCount === 'number') {
      sampleFeatureValue('user_watch_count_7d', watchCount).catch(() => {});
    }
  }

  const latencyMs = Date.now() - start;
  c.header('X-Feature-Latency-Ms', String(latencyMs));

  return c.json({ userId, features, latencyMs });
});

// Get features for a user-item pair (used during ranking)
app.post('/v1/features/rank', async (c) => {
  const { userId, itemIds } = await c.req.json<{ userId: string; itemIds: string[] }>();
  const start = Date.now();

  // Fetch user features once
  const userFeatures = await readFeatures('user', userId, [
    'user_watch_count_7d',
    'user_genre_preferences',
    'user_preferred_hour',
  ]);

  // Fetch item features in parallel
  const itemFeatures = await Promise.all(
    itemIds.map(itemId =>
      readFeatures('item', itemId, ['item_popularity_score', 'item_genre'])
        .then(features => ({ itemId, features }))
    )
  );

  // Fetch user-item interaction features
  const interactions = await Promise.all(
    itemIds.map(itemId =>
      readFeatures('user_item', `${userId}:${itemId}`, ['user_item_watch_progress'])
        .then(features => ({ itemId, features }))
    )
  );

  return c.json({
    userId,
    userFeatures,
    items: itemIds.map(id => ({
      itemId: id,
      itemFeatures: itemFeatures.find(f => f.itemId === id)?.features ?? {},
      interactionFeatures: interactions.find(f => f.itemId === id)?.features ?? {},
    })),
    latencyMs: Date.now() - start,
  });
});

export default app;

Results

After 8 weeks in production serving 2M users:

  • Feature serving latency: 3.2ms p50, 4.8ms p95 (was 800ms computing on-the-fly)
  • Training-serving skew: eliminated — same compute functions used in both pipelines
  • CTR improvement: +18% after eliminating skew (model finally sees the same data in production)
  • Real-time feature freshness: watch count updates within 2 seconds of a watch event
  • Drift detection caught a bug in genre classification 4 hours after deployment — auto-alerted, rolled back
  • Point-in-time training uses historical feature snapshots — no data leakage
  • Feature computation cost: dropped 94% (batch pre-compute vs per-request SQL)
  • Redis memory: 4.2GB for 2M users × 7 features + 500K items × 2 features