[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build a Data Mesh with Self-Serve Analytics

Build a Data Mesh with Self-Serve Analytics

Decentralize your monolithic data warehouse into domain-owned data products that teams publish and consume independently — cutting analytics request backlog from 6 weeks to same-day.

#postgresql#database#sql#relational#jsonb
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 7 skills

Avg quality 92/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

kafka-js

v

Not yet scored
View skill
>

prisma

v1.0.0

You are an expert in Prisma, the TypeScript ORM with a declarative schema, auto-generated type-safe client, migrations, and studio GUI. You help developers model databases with Prisma Schema Language, generate a fully typed client that catches query errors at compile time, run zero-downtime migrations, and integrate with Postgres, MySQL, SQLite, MongoDB, CockroachDB, and PlanetScale.

87/100 quality
1.42× impact
SAFE
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
>

redis

v1.0.0

Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.

93/100 quality
1.81× impact
SAFE
View skill
$

The Problem

Yuki is VP of Engineering at a 200-person e-commerce company. Every analytics request goes through a 4-person data team. The backlog is 6 weeks deep. Marketing wants campaign attribution data — they wait 3 weeks. Product needs funnel metrics for a board meeting next Tuesday — they scrape it from logs manually. The data team built 400+ dbt models in a monolithic warehouse, and nobody else understands which tables are trustworthy. Last quarter, the CEO made a hiring decision based on a dashboard that was pulling from a deprecated table — the numbers were 3 months stale.

Yuki needs:

  • Domain ownership — each team (product, marketing, finance) owns and publishes their data
  • Data products — discoverable, documented, quality-guaranteed datasets with SLAs
  • Self-serve access — business users query data without filing tickets to the data team
  • Data contracts — schema and quality guarantees so consumers can trust what they read
  • Federated governance — global standards (naming, PII handling) with domain autonomy
  • Discovery catalog — find what data exists, who owns it, and whether it's fresh

Step 1: Data Product Schema

Every data product has an owner, a schema contract, quality checks, and an SLA.

typescript
// src/catalog/data-product.ts
// Defines the contract for a publishable data product

import { z } from 'zod';

export const QualityCheck = z.object({
  name: z.string(),
  type: z.enum(['not_null', 'unique', 'range', 'freshness', 'row_count', 'custom_sql']),
  column: z.string().optional(),
  params: z.record(z.string(), z.unknown()).optional(),
  severity: z.enum(['error', 'warning']),
});

export const DataProduct = z.object({
  id: z.string().regex(/^[a-z][a-z0-9-]*\.[a-z][a-z0-9_]*$/),  // domain.product_name
  domain: z.string(),           // owning team: 'marketing', 'product', 'finance'
  name: z.string(),
  description: z.string(),
  owner: z.object({
    team: z.string(),
    contact: z.string().email(),
    slackChannel: z.string(),
  }),
  schema: z.array(z.object({
    column: z.string(),
    type: z.enum(['string', 'integer', 'float', 'boolean', 'timestamp', 'json', 'array']),
    description: z.string(),
    pii: z.boolean().default(false),    // personally identifiable information
    nullable: z.boolean().default(true),
  })),
  sla: z.object({
    freshness: z.enum(['realtime', '1h', '4h', '24h']),  // max data staleness
    availability: z.number().min(99).max(100),             // uptime %
    queryLatencyP95Ms: z.number().positive(),
  }),
  qualityChecks: z.array(QualityCheck),
  tags: z.array(z.string()),
  version: z.number().int().positive(),
  publishedAt: z.string().datetime(),
  status: z.enum(['draft', 'published', 'deprecated']),
});

export type DataProduct = z.infer<typeof DataProduct>;
typescript
// Example: Marketing team's campaign attribution data product
const campaignAttribution: DataProduct = {
  id: 'marketing.campaign_attribution',
  domain: 'marketing',
  name: 'Campaign Attribution',
  description: 'Multi-touch attribution for all marketing campaigns. Updated hourly. Covers web, email, and paid channels.',
  owner: {
    team: 'Marketing Analytics',
    contact: 'marketing-data@company.com',
    slackChannel: '#marketing-data',
  },
  schema: [
    { column: 'attribution_id', type: 'string', description: 'Unique attribution record ID', pii: false, nullable: false },
    { column: 'user_id', type: 'string', description: 'Anonymized user identifier', pii: true, nullable: false },
    { column: 'campaign_id', type: 'string', description: 'Campaign identifier', pii: false, nullable: false },
    { column: 'channel', type: 'string', description: 'Acquisition channel (organic, paid, email, referral)', pii: false, nullable: false },
    { column: 'touchpoint_at', type: 'timestamp', description: 'When the user interacted with the campaign', pii: false, nullable: false },
    { column: 'conversion_at', type: 'timestamp', description: 'When the user converted (null if not converted)', pii: false, nullable: true },
    { column: 'revenue_cents', type: 'integer', description: 'Revenue attributed to this touchpoint (multi-touch weighted)', pii: false, nullable: true },
    { column: 'attribution_model', type: 'string', description: 'Model used: linear, time_decay, position_based', pii: false, nullable: false },
  ],
  sla: {
    freshness: '1h',
    availability: 99.5,
    queryLatencyP95Ms: 2000,
  },
  qualityChecks: [
    { name: 'no_null_campaign_ids', type: 'not_null', column: 'campaign_id', severity: 'error' },
    { name: 'valid_channels', type: 'custom_sql', severity: 'error',
      params: { sql: "SELECT COUNT(*) FROM campaign_attribution WHERE channel NOT IN ('organic','paid','email','referral','social','direct')" }},
    { name: 'hourly_freshness', type: 'freshness', column: 'touchpoint_at', severity: 'error',
      params: { maxStaleHours: 2 }},
    { name: 'daily_row_count', type: 'row_count', severity: 'warning',
      params: { minRows: 1000, maxRows: 500000 }},
  ],
  tags: ['marketing', 'attribution', 'campaigns', 'revenue'],
  version: 3,
  publishedAt: '2025-03-01T00:00:00Z',
  status: 'published',
};

Step 2: Data Contract Validator

Before a data product is published, validate it meets the contract.

typescript
// src/quality/contract-validator.ts
// Validates data against its product contract

import { Pool } from 'pg';
import type { DataProduct, QualityCheck } from '../catalog/data-product';

const db = new Pool({ connectionString: process.env.WAREHOUSE_URL });

interface ValidationResult {
  checkName: string;
  passed: boolean;
  severity: string;
  message: string;
  value?: number;
}

export async function validateDataProduct(product: DataProduct): Promise<{
  valid: boolean;
  results: ValidationResult[];
}> {
  const results: ValidationResult[] = [];
  const tableName = product.id.replace('.', '_');  // marketing.campaign_attribution → marketing_campaign_attribution

  for (const check of product.qualityChecks) {
    const result = await runCheck(tableName, check);
    results.push(result);
  }

  // Schema validation — check all declared columns exist with correct types
  const schemaResult = await validateSchema(tableName, product.schema);
  results.push(...schemaResult);

  const hasErrors = results.some(r => !r.passed && r.severity === 'error');

  return { valid: !hasErrors, results };
}

async function runCheck(table: string, check: QualityCheck): Promise<ValidationResult> {
  try {
    switch (check.type) {
      case 'not_null': {
        const { rows } = await db.query(
          `SELECT COUNT(*) as nulls FROM ${table} WHERE ${check.column} IS NULL`
        );
        const nullCount = parseInt(rows[0].nulls);
        return {
          checkName: check.name,
          passed: nullCount === 0,
          severity: check.severity,
          message: nullCount === 0 ? 'No nulls found' : `${nullCount} null values in ${check.column}`,
          value: nullCount,
        };
      }

      case 'freshness': {
        const maxStaleHours = (check.params?.maxStaleHours as number) ?? 24;
        const { rows } = await db.query(
          `SELECT EXTRACT(EPOCH FROM NOW() - MAX(${check.column})) / 3600 as hours_stale FROM ${table}`
        );
        const staleHours = parseFloat(rows[0].hours_stale ?? '999');
        return {
          checkName: check.name,
          passed: staleHours <= maxStaleHours,
          severity: check.severity,
          message: `Data is ${staleHours.toFixed(1)}h stale (max: ${maxStaleHours}h)`,
          value: staleHours,
        };
      }

      case 'row_count': {
        const { rows } = await db.query(`SELECT COUNT(*) as cnt FROM ${table}`);
        const count = parseInt(rows[0].cnt);
        const min = (check.params?.minRows as number) ?? 0;
        const max = (check.params?.maxRows as number) ?? Infinity;
        return {
          checkName: check.name,
          passed: count >= min && count <= max,
          severity: check.severity,
          message: `${count} rows (expected ${min}-${max})`,
          value: count,
        };
      }

      case 'custom_sql': {
        const sql = check.params?.sql as string;
        const { rows } = await db.query(sql);
        const value = parseInt(rows[0]?.count ?? '0');
        return {
          checkName: check.name,
          passed: value === 0,
          severity: check.severity,
          message: value === 0 ? 'Custom check passed' : `${value} violations found`,
          value,
        };
      }

      default:
        return {
          checkName: check.name, passed: true,
          severity: check.severity, message: 'Check type not implemented',
        };
    }
  } catch (err: any) {
    return {
      checkName: check.name, passed: false,
      severity: check.severity, message: `Check failed: ${err.message}`,
    };
  }
}

async function validateSchema(
  table: string,
  expectedSchema: DataProduct['schema']
): Promise<ValidationResult[]> {
  const { rows: columns } = await db.query(`
    SELECT column_name, data_type, is_nullable
    FROM information_schema.columns
    WHERE table_name = $1
    ORDER BY ordinal_position
  `, [table]);

  const results: ValidationResult[] = [];
  for (const expected of expectedSchema) {
    const actual = columns.find((c: any) => c.column_name === expected.column);
    if (!actual) {
      results.push({
        checkName: `schema_${expected.column}`,
        passed: false,
        severity: 'error',
        message: `Column ${expected.column} not found in table`,
      });
    }
  }

  return results;
}

Step 3: Data Product Catalog API

typescript
// src/api/catalog.ts
// Discovery API — find, browse, and search data products

import { Hono } from 'hono';
import { PrismaClient } from '@prisma/client';

const app = new Hono();
const prisma = new PrismaClient();

// Search data products
app.get('/v1/catalog/search', async (c) => {
  const query = c.req.query('q') ?? '';
  const domain = c.req.query('domain');
  const tag = c.req.query('tag');

  const products = await prisma.dataProduct.findMany({
    where: {
      status: 'published',
      ...(domain && { domain }),
      ...(tag && { tags: { has: tag } }),
      ...(query && {
        OR: [
          { name: { contains: query, mode: 'insensitive' } },
          { description: { contains: query, mode: 'insensitive' } },
          { tags: { has: query.toLowerCase() } },
        ],
      }),
    },
    orderBy: { publishedAt: 'desc' },
  });

  return c.json({
    count: products.length,
    products: products.map(p => ({
      id: p.id,
      domain: p.domain,
      name: p.name,
      description: p.description,
      owner: p.owner,
      sla: p.sla,
      tags: p.tags,
      version: p.version,
      status: p.status,
      lastValidated: p.lastValidatedAt,
      qualityScore: p.qualityScore,  // 0-100 based on recent checks
    })),
  });
});

// Get full product details including schema and quality history
app.get('/v1/catalog/products/:id', async (c) => {
  const id = c.req.param('id');
  const product = await prisma.dataProduct.findUnique({
    where: { id },
    include: {
      qualityHistory: {
        orderBy: { checkedAt: 'desc' },
        take: 30,  // last 30 quality checks
      },
      consumers: {
        select: { team: true, useCase: true },
      },
    },
  });

  if (!product) return c.json({ error: 'Not found' }, 404);
  return c.json(product);
});

// List all domains
app.get('/v1/catalog/domains', async (c) => {
  const domains = await prisma.dataProduct.groupBy({
    by: ['domain'],
    _count: { id: true },
    where: { status: 'published' },
  });

  return c.json(domains.map(d => ({
    domain: d.domain,
    productCount: d._count.id,
  })));
});

// Register as a consumer of a data product (for lineage tracking)
app.post('/v1/catalog/products/:id/consumers', async (c) => {
  const id = c.req.param('id');
  const { team, useCase, contact } = await c.req.json();

  await prisma.dataProductConsumer.create({
    data: { dataProductId: id, team, useCase, contact },
  });

  return c.json({ registered: true });
});

export default app;

Step 4: Self-Serve Query Layer

Business users query data products through a governed SQL interface with PII masking.

typescript
// src/query/self-serve.ts
// Governed query layer — users query data products with automatic PII masking

import { Pool } from 'pg';
import type { DataProduct } from '../catalog/data-product';

const warehouse = new Pool({ connectionString: process.env.WAREHOUSE_URL });

interface QueryRequest {
  sql: string;
  userId: string;
  userRole: string;  // 'analyst', 'manager', 'admin'
}

export async function executeGoverned(
  request: QueryRequest,
  product: DataProduct
): Promise<{ rows: any[]; rowCount: number; queryTimeMs: number }> {
  // 1. Parse and validate the query targets only this product's table
  const tableName = product.id.replace('.', '_');
  if (!request.sql.toLowerCase().includes(tableName)) {
    throw new Error(`Query must reference the data product table: ${tableName}`);
  }

  // 2. Apply PII masking based on user role
  let maskedSql = request.sql;
  if (request.userRole !== 'admin') {
    const piiColumns = product.schema
      .filter(c => c.pii)
      .map(c => c.column);

    for (const col of piiColumns) {
      // Replace PII column references with masked versions
      const maskExpr = `'***' || RIGHT(${col}, 4)`;  // show last 4 chars
      maskedSql = maskedSql.replace(
        new RegExp(`\\b${col}\\b`, 'gi'),
        `(${maskExpr}) AS ${col}`
      );
    }
  }

  // 3. Add row limit for safety
  if (!maskedSql.toLowerCase().includes('limit')) {
    maskedSql += ' LIMIT 10000';
  }

  // 4. Execute with timeout
  const start = Date.now();
  const result = await warehouse.query({
    text: maskedSql,
    // 30-second query timeout
    statement_timeout: 30_000,
  } as any);

  return {
    rows: result.rows,
    rowCount: result.rowCount ?? 0,
    queryTimeMs: Date.now() - start,
  };
}

Results

After 4 months of data mesh adoption across 5 domains:

  • Analytics request backlog: eliminated — dropped from 6 weeks to same-day self-serve
  • Data team role shift: from 80% ad-hoc queries to 80% platform development and data quality
  • Data products published: 23 products across 5 domains (marketing, product, finance, ops, engineering)
  • Self-serve queries: 450/day by business users (was 0 — everything went through data team)
  • Data quality score: 94% average across all products (automated checks run hourly)
  • Stale data incidents: 2 in 4 months (was 8/month — SLA monitoring catches them instantly)
  • CEO dashboard: now pulls from 3 validated data products with freshness guarantees, not a deprecated table
  • PII compliance: 100% of queries from non-admin users get automatic masking — zero manual reviews needed
  • Time to publish new data product: 2 days (domain team does it themselves with the template)