[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build a Data Pipeline with Schema Evolution

Build a Data Pipeline with Schema Evolution

Build a data ingestion pipeline that handles schema changes gracefully — auto-detecting new fields, managing backward compatibility, versioning schemas, and transforming data between schema versions without downtime.

#postgresql#database#sql#relational#jsonb
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 5 skills

Avg quality 93/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

redis

v1.0.0

Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.

93/100 quality
1.81× impact
SAFE
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
$

The Problem

Viktor leads data engineering at a 40-person analytics company. They ingest data from 30+ client APIs, each sending JSON payloads that change without notice. Last month, a client added a metadata field that broke the pipeline because the Postgres column didn't exist. Another client renamed user_id to userId — 3 days of data was silently dropped because the column mapping failed. Every schema change requires a manual migration, a code change, and a redeployment. They need a pipeline that detects schema changes automatically, adapts without downtime, and maintains backward compatibility.

Step 1: Build the Schema Registry

typescript
// src/schema/registry.ts — Schema registry with version tracking and compatibility checks
import { pool } from "../db";
import { z } from "zod";

interface SchemaVersion {
  id: string;
  source: string;                // "client-x-events", "webhook-stripe"
  version: number;
  fields: FieldDefinition[];
  createdAt: string;
  changeDescription: string;
}

interface FieldDefinition {
  name: string;
  type: "string" | "number" | "boolean" | "object" | "array" | "null";
  nullable: boolean;
  required: boolean;             // present in 95%+ of records
  examples: any[];               // sample values for documentation
  addedInVersion: number;
  removedInVersion?: number;
}

interface SchemaChange {
  type: "field_added" | "field_removed" | "field_type_changed" | "field_renamed";
  field: string;
  details: string;
  breaking: boolean;
}

export class SchemaRegistry {
  // Infer schema from a batch of records
  inferSchema(records: Record<string, any>[]): FieldDefinition[] {
    const fieldStats = new Map<string, {
      types: Map<string, number>;
      nullCount: number;
      presentCount: number;
      examples: Set<string>;
    }>();

    for (const record of records) {
      this.collectFieldStats(record, "", fieldStats, records.length);
    }

    const fields: FieldDefinition[] = [];
    for (const [name, stats] of fieldStats) {
      const dominantType = [...stats.types.entries()].sort((a, b) => b[1] - a[1])[0];
      
      fields.push({
        name,
        type: dominantType[0] as any,
        nullable: stats.nullCount > 0,
        required: stats.presentCount / records.length > 0.95,
        examples: [...stats.examples].slice(0, 3),
        addedInVersion: 0, // will be set by registerSchema
      });
    }

    return fields;
  }

  private collectFieldStats(
    obj: any,
    prefix: string,
    stats: Map<string, any>,
    totalRecords: number
  ): void {
    for (const [key, value] of Object.entries(obj)) {
      const fullKey = prefix ? `${prefix}.${key}` : key;

      if (!stats.has(fullKey)) {
        stats.set(fullKey, {
          types: new Map(),
          nullCount: 0,
          presentCount: 0,
          examples: new Set(),
        });
      }

      const s = stats.get(fullKey)!;
      s.presentCount++;

      if (value === null || value === undefined) {
        s.nullCount++;
        s.types.set("null", (s.types.get("null") || 0) + 1);
      } else {
        const type = Array.isArray(value) ? "array" : typeof value;
        s.types.set(type, (s.types.get(type) || 0) + 1);

        if (type !== "object" && type !== "array" && s.examples.size < 3) {
          s.examples.add(JSON.stringify(value));
        }

        if (type === "object" && !Array.isArray(value)) {
          this.collectFieldStats(value, fullKey, stats, totalRecords);
        }
      }
    }
  }

  // Compare two schemas and detect changes
  detectChanges(oldFields: FieldDefinition[], newFields: FieldDefinition[]): SchemaChange[] {
    const changes: SchemaChange[] = [];
    const oldMap = new Map(oldFields.map((f) => [f.name, f]));
    const newMap = new Map(newFields.map((f) => [f.name, f]));

    // New fields
    for (const [name, field] of newMap) {
      if (!oldMap.has(name)) {
        changes.push({
          type: "field_added",
          field: name,
          details: `New ${field.type} field (${field.required ? "required" : "optional"})`,
          breaking: false, // adding fields is backward compatible
        });
      }
    }

    // Removed fields
    for (const [name, field] of oldMap) {
      if (!newMap.has(name)) {
        // Check if it was renamed (similar examples in a new field)
        const possibleRename = [...newMap.entries()].find(([n, f]) =>
          !oldMap.has(n) && f.type === field.type
        );

        if (possibleRename) {
          changes.push({
            type: "field_renamed",
            field: name,
            details: `Possibly renamed to "${possibleRename[0]}"`,
            breaking: true,
          });
        } else {
          changes.push({
            type: "field_removed",
            field: name,
            details: `${field.type} field no longer present`,
            breaking: field.required,
          });
        }
      }
    }

    // Type changes
    for (const [name, newField] of newMap) {
      const oldField = oldMap.get(name);
      if (oldField && oldField.type !== newField.type) {
        changes.push({
          type: "field_type_changed",
          field: name,
          details: `Type changed from ${oldField.type} to ${newField.type}`,
          breaking: true,
        });
      }
    }

    return changes;
  }

  // Register a new schema version
  async registerSchema(
    source: string,
    fields: FieldDefinition[],
    changes: SchemaChange[]
  ): Promise<SchemaVersion> {
    const { rows: [latest] } = await pool.query(
      "SELECT version FROM schema_versions WHERE source = $1 ORDER BY version DESC LIMIT 1",
      [source]
    );

    const version = (latest?.version || 0) + 1;

    // Set addedInVersion for new fields
    const fieldsWithVersion = fields.map((f) => ({
      ...f,
      addedInVersion: f.addedInVersion || version,
    }));

    const schema: SchemaVersion = {
      id: `${source}-v${version}`,
      source,
      version,
      fields: fieldsWithVersion,
      createdAt: new Date().toISOString(),
      changeDescription: changes.map((c) => `${c.type}: ${c.field}${c.details}`).join("; "),
    };

    await pool.query(
      `INSERT INTO schema_versions (id, source, version, fields, change_description, created_at)
       VALUES ($1, $2, $3, $4, $5, NOW())`,
      [schema.id, source, version, JSON.stringify(fieldsWithVersion), schema.changeDescription]
    );

    // Auto-create missing database columns for non-breaking changes
    if (changes.some((c) => c.type === "field_added")) {
      await this.autoMigrate(source, changes, fieldsWithVersion);
    }

    return schema;
  }

  private async autoMigrate(source: string, changes: SchemaChange[], fields: FieldDefinition[]): Promise<void> {
    const tableName = `data_${source.replace(/[^a-z0-9]/gi, "_")}`;

    for (const change of changes) {
      if (change.type !== "field_added") continue;

      const field = fields.find((f) => f.name === change.field);
      if (!field) continue;

      // Only auto-migrate top-level fields (not nested)
      if (field.name.includes(".")) continue;

      const pgType = {
        string: "TEXT",
        number: "DOUBLE PRECISION",
        boolean: "BOOLEAN",
        object: "JSONB",
        array: "JSONB",
        null: "TEXT",
      }[field.type];

      try {
        await pool.query(`ALTER TABLE ${tableName} ADD COLUMN IF NOT EXISTS "${field.name}" ${pgType}`);
        console.log(`[schema] Auto-migrated: added column ${field.name} (${pgType}) to ${tableName}`);
      } catch (err) {
        console.error(`[schema] Auto-migration failed for ${field.name}:`, err);
      }
    }
  }
}

export const schemaRegistry = new SchemaRegistry();

Step 2: Build the Adaptive Ingestion Pipeline

typescript
// src/pipeline/ingestion.ts — Data ingestion with automatic schema detection
import { schemaRegistry } from "../schema/registry";
import { pool } from "../db";
import { Redis } from "ioredis";

const redis = new Redis(process.env.REDIS_URL!);

export async function ingestBatch(
  source: string,
  records: Record<string, any>[]
): Promise<{ ingested: number; schemaChanged: boolean; changes: any[] }> {
  // Infer schema from this batch
  const inferredFields = schemaRegistry.inferSchema(records);

  // Compare against the latest registered schema
  const { rows: [latest] } = await pool.query(
    "SELECT fields FROM schema_versions WHERE source = $1 ORDER BY version DESC LIMIT 1",
    [source]
  );

  let schemaChanged = false;
  let changes: any[] = [];

  if (latest) {
    const currentFields = JSON.parse(latest.fields);
    changes = schemaRegistry.detectChanges(currentFields, inferredFields);

    if (changes.length > 0) {
      schemaChanged = true;

      const hasBreaking = changes.some((c) => c.breaking);
      if (hasBreaking) {
        // Alert on breaking changes but don't block ingestion
        await redis.rpush("schema:alerts", JSON.stringify({
          source,
          changes,
          timestamp: Date.now(),
          severity: "breaking",
        }));
      }

      await schemaRegistry.registerSchema(source, inferredFields, changes);
    }
  } else {
    // First schema registration
    await schemaRegistry.registerSchema(source, inferredFields, []);
    schemaChanged = true;
  }

  // Insert records with field mapping
  const tableName = `data_${source.replace(/[^a-z0-9]/gi, "_")}`;
  let ingested = 0;

  for (const record of records) {
    const flatRecord = flattenObject(record);
    const columns = Object.keys(flatRecord).filter((k) => !k.includes("."));
    const values = columns.map((c) => flatRecord[c]);

    try {
      const placeholders = columns.map((_, i) => `$${i + 1}`).join(", ");
      const colNames = columns.map((c) => `"${c}"`).join(", ");

      await pool.query(
        `INSERT INTO ${tableName} (${colNames}, _raw, _ingested_at)
         VALUES (${placeholders}, $${columns.length + 1}, NOW())`,
        [...values, JSON.stringify(record)]
      );
      ingested++;
    } catch (err: any) {
      // Store failed records for retry
      await redis.rpush(`pipeline:failed:${source}`, JSON.stringify({
        record,
        error: err.message,
        timestamp: Date.now(),
      }));
    }
  }

  return { ingested, schemaChanged, changes };
}

function flattenObject(obj: any, prefix = ""): Record<string, any> {
  const result: Record<string, any> = {};
  for (const [key, value] of Object.entries(obj)) {
    const fullKey = prefix ? `${prefix}.${key}` : key;
    if (value && typeof value === "object" && !Array.isArray(value)) {
      Object.assign(result, flattenObject(value, fullKey));
    } else {
      result[fullKey] = value;
    }
  }
  return result;
}

Results

  • Zero downtime from schema changes — the pipeline auto-detects new fields and adds database columns without manual intervention; the metadata field incident is impossible
  • Field rename detection saved 3 days of data — when a client renamed user_id to userId, the registry flagged it as a breaking change and alerted within minutes
  • Schema version history for debugging — every change is tracked with version numbers, timestamps, and diffs; "when did this field appear?" is a single query
  • 30+ sources with different schemas managed automatically — each source has independent schema tracking; changes in one source don't affect others
  • Auto-migration handles 90% of schema changes — only breaking changes (type changes, renames) require manual intervention; new fields are added automatically