[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build a Schema Registry for Event-Driven Architecture

Build a Schema Registry for Event-Driven Architecture

Prevent breaking changes in event schemas from crashing downstream consumers — with versioned schemas, compatibility checks, and automated contract testing across 30 microservices.

#postgresql#database#sql#relational#jsonb
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 6 skills

Avg quality 90/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

kafka-js

v

Not yet scored
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
>

vitest

v1.0.0

You are an expert in Vitest, the Vite-native testing framework. You help developers write and run unit tests, integration tests, and component tests with native TypeScript support, Jest-compatible API, built-in mocking, code coverage, snapshot testing, and watch mode — leveraging Vite's transform pipeline for instant test execution without separate compilation.

80/100 quality
1.64× impact
SAFE
View skill
$

The Problem

30 microservices communicate through Kafka events. No schema validation — producers send whatever JSON they want, consumers break when fields change. Last month, the user service renamed user_name to username in a user.updated event. 4 downstream services crashed. The analytics pipeline silently dropped 3 days of data because it couldn't parse the new format. Nobody knows what events exist, what they contain, or who produces/consumes them.

Step 1: Schema Storage and Versioning

typescript
// src/registry/store.ts
import { Pool } from 'pg';
import { z } from 'zod';

const db = new Pool({ connectionString: process.env.DATABASE_URL });

export const SchemaEntry = z.object({
  subject: z.string(),         // e.g., "user.updated"
  version: z.number().int(),
  schema: z.string(),          // JSON Schema or Zod schema as JSON
  compatibility: z.enum(['backward', 'forward', 'full', 'none']),
  producer: z.string(),        // service that produces this event
  consumers: z.array(z.string()),
  description: z.string(),
  createdBy: z.string(),
  createdAt: z.string().datetime(),
});

export async function registerSchema(entry: z.infer<typeof SchemaEntry>): Promise<{
  registered: boolean;
  errors?: string[];
}> {
  // Fetch latest version
  const { rows } = await db.query(
    `SELECT schema, version FROM schemas WHERE subject = $1 ORDER BY version DESC LIMIT 1`,
    [entry.subject]
  );

  // Check compatibility
  if (rows[0] && entry.compatibility !== 'none') {
    const errors = checkCompatibility(
      JSON.parse(rows[0].schema),
      JSON.parse(entry.schema),
      entry.compatibility
    );
    if (errors.length > 0) {
      return { registered: false, errors };
    }
  }

  const version = rows[0] ? rows[0].version + 1 : 1;

  await db.query(`
    INSERT INTO schemas (subject, version, schema, compatibility, producer, consumers, description, created_by, created_at)
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
  `, [entry.subject, version, entry.schema, entry.compatibility,
      entry.producer, entry.consumers, entry.description, entry.createdBy]);

  return { registered: true };
}

function checkCompatibility(
  oldSchema: any,
  newSchema: any,
  mode: string
): string[] {
  const errors: string[] = [];

  if (mode === 'backward' || mode === 'full') {
    // New consumers must read old data: new schema can't add required fields
    const oldRequired = new Set(oldSchema.required ?? []);
    const newRequired = new Set(newSchema.required ?? []);

    for (const field of newRequired) {
      if (!oldRequired.has(field) && !oldSchema.properties?.[field]) {
        errors.push(`BACKWARD: New required field '${field}' not in old schema — old data won't have it`);
      }
    }
  }

  if (mode === 'forward' || mode === 'full') {
    // Old consumers must read new data: can't remove fields old consumers use
    const oldFields = Object.keys(oldSchema.properties ?? {});
    const newFields = new Set(Object.keys(newSchema.properties ?? {}));

    for (const field of oldFields) {
      if (!newFields.has(field)) {
        errors.push(`FORWARD: Removed field '${field}' — old consumers still expect it`);
      }
    }

    // Can't change field types
    for (const field of oldFields) {
      if (newSchema.properties?.[field]) {
        const oldType = oldSchema.properties[field].type;
        const newType = newSchema.properties[field].type;
        if (oldType !== newType) {
          errors.push(`FORWARD: Changed type of '${field}' from '${oldType}' to '${newType}'`);
        }
      }
    }
  }

  return errors;
}

Step 2: Kafka Serializer/Deserializer

typescript
// src/registry/serde.ts
import Ajv from 'ajv';
import { Pool } from 'pg';

const db = new Pool({ connectionString: process.env.DATABASE_URL });
const ajv = new Ajv({ allErrors: true });

// Cache compiled validators
const validatorCache = new Map<string, any>();

export async function serialize(
  subject: string,
  data: Record<string, unknown>
): Promise<Buffer> {
  const schema = await getLatestSchema(subject);
  const validate = getValidator(subject, schema);

  if (!validate(data)) {
    const errors = validate.errors?.map(e => `${e.instancePath} ${e.message}`).join('; ');
    throw new Error(`Schema validation failed for ${subject}: ${errors}`);
  }

  // Envelope: version prefix + JSON payload
  const version = await getLatestVersion(subject);
  const payload = JSON.stringify(data);
  const envelope = Buffer.alloc(5 + payload.length);
  envelope.writeUInt8(0, 0);         // magic byte
  envelope.writeUInt32BE(version, 1); // schema version
  envelope.write(payload, 5);
  return envelope;
}

export async function deserialize(
  subject: string,
  buffer: Buffer
): Promise<{ version: number; data: Record<string, unknown> }> {
  const magic = buffer.readUInt8(0);
  if (magic !== 0) throw new Error('Invalid envelope magic byte');

  const version = buffer.readUInt32BE(1);
  const payload = buffer.subarray(5).toString();
  const data = JSON.parse(payload);

  // Validate against the version it was produced with
  const schema = await getSchemaByVersion(subject, version);
  const validate = getValidator(`${subject}:${version}`, schema);

  if (!validate(data)) {
    console.warn(`Schema validation warning for ${subject} v${version}:`, validate.errors);
  }

  return { version, data };
}

function getValidator(key: string, schema: any): any {
  if (!validatorCache.has(key)) {
    validatorCache.set(key, ajv.compile(schema));
  }
  return validatorCache.get(key);
}

async function getLatestSchema(subject: string): Promise<any> {
  const { rows } = await db.query(
    `SELECT schema FROM schemas WHERE subject = $1 ORDER BY version DESC LIMIT 1`,
    [subject]
  );
  if (!rows[0]) throw new Error(`No schema registered for ${subject}`);
  return JSON.parse(rows[0].schema);
}

async function getSchemaByVersion(subject: string, version: number): Promise<any> {
  const { rows } = await db.query(
    `SELECT schema FROM schemas WHERE subject = $1 AND version = $2`,
    [subject, version]
  );
  if (!rows[0]) throw new Error(`Schema ${subject} v${version} not found`);
  return JSON.parse(rows[0].schema);
}

async function getLatestVersion(subject: string): Promise<number> {
  const { rows } = await db.query(
    `SELECT version FROM schemas WHERE subject = $1 ORDER BY version DESC LIMIT 1`,
    [subject]
  );
  return rows[0]?.version ?? 1;
}

Step 3: CI Contract Testing

typescript
// src/registry/contract-test.ts
import { describe, test, expect } from 'vitest';

// Auto-generated contract tests from registry
export function generateContractTests(
  subject: string,
  producerSamples: Record<string, unknown>[],
  schema: any
): void {
  const Ajv = require('ajv');
  const ajv = new Ajv({ allErrors: true });
  const validate = ajv.compile(schema);

  describe(`Contract: ${subject}`, () => {
    for (const [i, sample] of producerSamples.entries()) {
      test(`sample ${i + 1} matches schema`, () => {
        const valid = validate(sample);
        expect(valid).toBe(true);
        if (!valid) {
          console.error(validate.errors);
        }
      });
    }

    test('schema is backward compatible with previous version', async () => {
      // Fetch previous version and verify
      const prevSchema = await getSchemaByVersion(subject, schema.version - 1).catch(() => null);
      if (!prevSchema) return; // first version

      const errors = checkCompatibility(prevSchema, schema, 'backward');
      expect(errors).toEqual([]);
    });
  });
}

Results

  • Breaking changes caught in CI: 8 in the first month (would have crashed production)
  • Schema catalog: complete inventory of 120+ event types across 30 services
  • The user_nameusername incident: impossible now — compatibility check blocks it
  • Data pipeline reliability: zero silent data loss (was 3 days last incident)
  • Developer onboarding: new engineers discover events via registry, not tribal knowledge
  • Schema evolution: safe field additions, deprecations, and type migrations
  • Consumer confidence: deserializer validates every message against its exact schema version