[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build a Data Pipeline Orchestrator with Lineage Tracking

Build a Data Pipeline Orchestrator with Lineage Tracking

Orchestrate ETL pipelines with dependency graphs, automatic retries, and data lineage tracking — so when a dashboard number looks wrong, you can trace it back to the source table in seconds.

#postgresql#database#sql#relational#jsonb
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 7 skills

Avg quality 93/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

redis

v1.0.0

Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.

93/100 quality
1.81× impact
SAFE
View skill
>

bull-mq

v1.0.0

You are an expert in BullMQ, the high-performance job queue for Node.js built on Redis. You help developers build reliable background processing systems with delayed jobs, rate limiting, prioritization, repeatable cron jobs, job dependencies, concurrency control, and dead-letter handling — powering email sending, image processing, webhook delivery, report generation, and any async workload.

93/100 quality
3.00× impact
SAFE
View skill
>

kafka-js

v

Not yet scored
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
$

The Problem

A data team runs 80 ETL jobs across 3 tools (cron scripts, Airflow, and manual SQL queries). When the CEO asks "why is revenue $200K lower on the dashboard?" nobody can trace it. The pipeline graph is undocumented — nobody knows that the revenue dashboard depends on 6 intermediate tables that depend on 3 source systems. When a source API changes format, 12 downstream jobs fail over 3 days as each one is discovered. Average time to root-cause a data issue: 4 hours of detective work.

Step 1: Pipeline DAG Definition

typescript
// src/orchestrator/dag.ts
import { z } from 'zod';

export const PipelineStep = z.object({
  id: z.string(),
  name: z.string(),
  type: z.enum(['sql', 'typescript', 'python', 'api_extract']),
  dependsOn: z.array(z.string()).default([]),
  inputs: z.array(z.object({
    source: z.string(),     // "postgres:raw.orders" or "api:stripe"
    table: z.string().optional(),
  })),
  outputs: z.array(z.object({
    destination: z.string(), // "postgres:analytics.daily_revenue"
    table: z.string(),
  })),
  schedule: z.string().optional(),  // cron expression
  timeoutSeconds: z.number().int().default(300),
  retries: z.number().int().default(2),
  handler: z.string(),
});

export const Pipeline = z.object({
  id: z.string(),
  name: z.string(),
  description: z.string(),
  steps: z.array(PipelineStep),
  schedule: z.string().optional(),
  owner: z.string().email(),
  slackChannel: z.string().optional(),
});

export const revenuePipeline: z.infer<typeof Pipeline> = {
  id: 'revenue-pipeline',
  name: 'Daily Revenue Pipeline',
  description: 'Extract orders, payments, and refunds → transform → load into analytics',
  owner: 'data-team@company.com',
  schedule: '0 6 * * *', // 6 AM daily
  steps: [
    {
      id: 'extract-orders',
      name: 'Extract Orders',
      type: 'sql',
      dependsOn: [],
      inputs: [{ source: 'postgres:production', table: 'orders' }],
      outputs: [{ destination: 'postgres:warehouse', table: 'raw.orders' }],
      timeoutSeconds: 120,
      retries: 2,
      handler: 'extract-orders',
    },
    {
      id: 'extract-payments',
      name: 'Extract Payments from Stripe',
      type: 'api_extract',
      dependsOn: [],
      inputs: [{ source: 'api:stripe' }],
      outputs: [{ destination: 'postgres:warehouse', table: 'raw.stripe_payments' }],
      timeoutSeconds: 180,
      retries: 3,
      handler: 'extract-stripe-payments',
    },
    {
      id: 'transform-revenue',
      name: 'Calculate Daily Revenue',
      type: 'sql',
      dependsOn: ['extract-orders', 'extract-payments'],
      inputs: [
        { source: 'postgres:warehouse', table: 'raw.orders' },
        { source: 'postgres:warehouse', table: 'raw.stripe_payments' },
      ],
      outputs: [{ destination: 'postgres:warehouse', table: 'analytics.daily_revenue' }],
      timeoutSeconds: 60,
      retries: 1,
      handler: 'transform-revenue',
    },
    {
      id: 'aggregate-metrics',
      name: 'Aggregate Business Metrics',
      type: 'sql',
      dependsOn: ['transform-revenue'],
      inputs: [{ source: 'postgres:warehouse', table: 'analytics.daily_revenue' }],
      outputs: [{ destination: 'postgres:warehouse', table: 'analytics.business_metrics' }],
      timeoutSeconds: 60,
      retries: 1,
      handler: 'aggregate-metrics',
    },
  ],
};

Step 2: DAG Executor with Lineage

typescript
// src/orchestrator/executor.ts
import { Queue, Worker } from 'bullmq';
import { Redis } from 'ioredis';
import { Pool } from 'pg';
import type { Pipeline, PipelineStep } from './dag';

const connection = new Redis(process.env.REDIS_URL!);
const db = new Pool({ connectionString: process.env.DATABASE_URL });

export async function executePipeline(pipeline: z.infer<typeof Pipeline>): Promise<string> {
  const runId = crypto.randomUUID();

  await db.query(`
    INSERT INTO pipeline_runs (id, pipeline_id, status, started_at)
    VALUES ($1, $2, 'running', NOW())
  `, [runId, pipeline.id]);

  // Topological sort: execute steps respecting dependencies
  const executed = new Set<string>();
  const queue = [...pipeline.steps.filter(s => s.dependsOn.length === 0)];

  while (queue.length > 0) {
    // Execute all ready steps in parallel
    const batch = queue.splice(0, queue.length);

    await Promise.all(batch.map(async (step) => {
      await executeStep(runId, pipeline.id, step);
      executed.add(step.id);

      // Record lineage
      await recordLineage(runId, step);
    }));

    // Find next steps whose dependencies are all satisfied
    for (const step of pipeline.steps) {
      if (!executed.has(step.id) && step.dependsOn.every(d => executed.has(d))) {
        queue.push(step);
      }
    }
  }

  await db.query(`UPDATE pipeline_runs SET status = 'completed', completed_at = NOW() WHERE id = $1`, [runId]);
  return runId;
}

async function executeStep(runId: string, pipelineId: string, step: z.infer<typeof PipelineStep>): Promise<void> {
  const stepRunId = crypto.randomUUID();

  await db.query(`
    INSERT INTO step_runs (id, run_id, step_id, status, started_at)
    VALUES ($1, $2, $3, 'running', NOW())
  `, [stepRunId, runId, step.id]);

  let lastError: string | null = null;

  for (let attempt = 0; attempt <= step.retries; attempt++) {
    try {
      const handler = handlers[step.handler];
      if (!handler) throw new Error(`No handler: ${step.handler}`);

      const result = await Promise.race([
        handler(),
        new Promise((_, reject) => setTimeout(() => reject(new Error('Timeout')), step.timeoutSeconds * 1000)),
      ]);

      await db.query(`
        UPDATE step_runs SET status = 'completed', completed_at = NOW(), rows_affected = $1
        WHERE id = $2
      `, [(result as any)?.rowsAffected ?? 0, stepRunId]);

      return;
    } catch (err: any) {
      lastError = err.message;
      if (attempt < step.retries) {
        await new Promise(r => setTimeout(r, 5000 * (attempt + 1)));
      }
    }
  }

  await db.query(`UPDATE step_runs SET status = 'failed', error = $1, completed_at = NOW() WHERE id = $2`, [lastError, stepRunId]);
  throw new Error(`Step ${step.id} failed: ${lastError}`);
}

async function recordLineage(runId: string, step: z.infer<typeof PipelineStep>): Promise<void> {
  for (const input of step.inputs) {
    for (const output of step.outputs) {
      await db.query(`
        INSERT INTO data_lineage (run_id, step_id, source, source_table, destination, destination_table, recorded_at)
        VALUES ($1, $2, $3, $4, $5, $6, NOW())
      `, [runId, step.id, input.source, input.table, output.destination, output.table]);
    }
  }
}

const handlers: Record<string, () => Promise<{ rowsAffected: number }>> = {};
export function registerHandler(name: string, fn: () => Promise<{ rowsAffected: number }>): void {
  handlers[name] = fn;
}

import { z } from 'zod';

Step 3: Lineage Query API

typescript
// src/api/lineage.ts
import { Hono } from 'hono';
import { Pool } from 'pg';

const app = new Hono();
const db = new Pool({ connectionString: process.env.DATABASE_URL });

// Trace upstream: "what feeds into this table?"
app.get('/v1/lineage/upstream/:table', async (c) => {
  const table = c.req.param('table');

  const { rows } = await db.query(`
    WITH RECURSIVE upstream AS (
      SELECT source, source_table, destination, destination_table, step_id, 1 as depth
      FROM data_lineage WHERE destination_table = $1
      UNION ALL
      SELECT l.source, l.source_table, l.destination, l.destination_table, l.step_id, u.depth + 1
      FROM data_lineage l JOIN upstream u ON l.destination_table = u.source_table
      WHERE u.depth < 10
    )
    SELECT DISTINCT source, source_table, destination, destination_table, depth
    FROM upstream ORDER BY depth
  `, [table]);

  return c.json({ table, upstream: rows });
});

// Trace downstream: "what breaks if this table changes?"
app.get('/v1/lineage/downstream/:table', async (c) => {
  const table = c.req.param('table');

  const { rows } = await db.query(`
    WITH RECURSIVE downstream AS (
      SELECT source, source_table, destination, destination_table, step_id, 1 as depth
      FROM data_lineage WHERE source_table = $1
      UNION ALL
      SELECT l.source, l.source_table, l.destination, l.destination_table, l.step_id, d.depth + 1
      FROM data_lineage l JOIN downstream d ON l.source_table = d.destination_table
      WHERE d.depth < 10
    )
    SELECT DISTINCT source, source_table, destination, destination_table, depth
    FROM downstream ORDER BY depth
  `, [table]);

  return c.json({ table, downstream: rows });
});

export default app;

Results

  • Root-cause time: 30 seconds with lineage trace (was 4 hours of detective work)
  • CEO's $200K question: traced to Stripe API format change in 2 minutes
  • Cascading failures: when source changes, all 12 downstream jobs alerted immediately
  • 80 ETL jobs: unified into one orchestrator with dependency-aware execution
  • Retry success: 15% of transient failures auto-recovered (were permanent before)
  • Data freshness SLA: 99.5% of tables updated by 7 AM (was untracked)
  • Impact analysis: before changing a table, see every downstream dependency instantly