[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build a Distributed Task Scheduler with Exactly-Once Execution

Build a Distributed Task Scheduler with Exactly-Once Execution

Build a task scheduler that guarantees exactly-once execution across a cluster — no duplicate invoice sends, no missed cron jobs, and automatic failover when nodes crash.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 6 skills

Avg quality 93/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

redis

v1.0.0

Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.

93/100 quality
1.81× impact
SAFE
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

bull-mq

v1.0.0

You are an expert in BullMQ, the high-performance job queue for Node.js built on Redis. You help developers build reliable background processing systems with delayed jobs, rate limiting, prioritization, repeatable cron jobs, job dependencies, concurrency control, and dead-letter handling — powering email sending, image processing, webhook delivery, report generation, and any async workload.

93/100 quality
3.00× impact
SAFE
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
$

The Problem

A billing platform runs 50 scheduled tasks: invoice generation, payment retries, subscription renewals, usage reports, dunning emails. The cron jobs run on a single server. When the server restarts during a deployment, 3-5 tasks are missed. When they added a second server for redundancy, both servers ran the same tasks — customers received duplicate invoices and double charges. One customer was charged $12K twice for their annual plan.

Step 1: Distributed Lock-Based Scheduler

typescript
// src/scheduler/distributed-scheduler.ts
import { Redis } from 'ioredis';
import { Pool } from 'pg';
import { randomUUID } from 'crypto';

const redis = new Redis(process.env.REDIS_URL!);
const db = new Pool({ connectionString: process.env.DATABASE_URL });

const NODE_ID = `node-${randomUUID().slice(0, 8)}`;

interface ScheduledTask {
  id: string;
  name: string;
  cronExpression: string;
  handler: string;
  timeoutSeconds: number;
  retryOnFailure: boolean;
  maxRetries: number;
}

// Acquire a distributed lock before executing a task
async function acquireLock(taskId: string, ttlSeconds: number): Promise<boolean> {
  const lockKey = `scheduler:lock:${taskId}`;
  const result = await redis.set(lockKey, NODE_ID, 'NX', 'EX', ttlSeconds);
  return result === 'OK';
}

async function releaseLock(taskId: string): Promise<void> {
  // Only release if we own the lock (prevent releasing another node's lock)
  const lockKey = `scheduler:lock:${taskId}`;
  await redis.eval(`
    if redis.call('get', KEYS[1]) == ARGV[1] then
      return redis.call('del', KEYS[1])
    end
    return 0
  `, 1, lockKey, NODE_ID);
}

export async function executeTask(task: ScheduledTask): Promise<{
  executed: boolean;
  executionId: string | null;
  error?: string;
}> {
  const executionId = randomUUID();

  // Step 1: Acquire lock
  const locked = await acquireLock(task.id, task.timeoutSeconds + 30);
  if (!locked) {
    return { executed: false, executionId: null }; // Another node is handling it
  }

  try {
    // Step 2: Check if already executed (idempotency)
    const { rows } = await db.query(`
      SELECT id FROM task_executions
      WHERE task_id = $1 AND scheduled_for = $2 AND status IN ('completed', 'running')
    `, [task.id, getCurrentScheduledTime(task.cronExpression)]);

    if (rows.length > 0) {
      return { executed: false, executionId: null }; // Already executed
    }

    // Step 3: Record execution start
    await db.query(`
      INSERT INTO task_executions (id, task_id, node_id, scheduled_for, status, started_at)
      VALUES ($1, $2, $3, $4, 'running', NOW())
    `, [executionId, task.id, NODE_ID, getCurrentScheduledTime(task.cronExpression)]);

    // Step 4: Run the handler
    const handler = handlers[task.handler];
    if (!handler) throw new Error(`Unknown handler: ${task.handler}`);

    await handler();

    // Step 5: Mark completed
    await db.query(`
      UPDATE task_executions SET status = 'completed', completed_at = NOW()
      WHERE id = $1
    `, [executionId]);

    return { executed: true, executionId };
  } catch (err: any) {
    await db.query(`
      UPDATE task_executions SET status = 'failed', error = $1, completed_at = NOW()
      WHERE id = $2
    `, [err.message, executionId]);

    return { executed: true, executionId, error: err.message };
  } finally {
    await releaseLock(task.id);
  }
}

function getCurrentScheduledTime(cron: string): string {
  // Snap to the nearest scheduled time for idempotency
  const now = new Date();
  now.setSeconds(0, 0);
  return now.toISOString();
}

const handlers: Record<string, () => Promise<void>> = {};
export function registerHandler(name: string, fn: () => Promise<void>): void {
  handlers[name] = fn;
}

Step 2: Heartbeat and Failover

typescript
// src/scheduler/heartbeat.ts
import { Redis } from 'ioredis';

const redis = new Redis(process.env.REDIS_URL!);
const NODE_ID = process.env.NODE_ID!;

// Report heartbeat every 10 seconds
export async function startHeartbeat(): Promise<void> {
  setInterval(async () => {
    await redis.setex(`scheduler:heartbeat:${NODE_ID}`, 30, JSON.stringify({
      nodeId: NODE_ID,
      timestamp: Date.now(),
      activeTasks: await getActiveTasks(),
    }));
  }, 10_000);
}

// Detect dead nodes and reassign their tasks
export async function checkForDeadNodes(): Promise<string[]> {
  const keys = await redis.keys('scheduler:heartbeat:*');
  const deadNodes: string[] = [];

  for (const key of keys) {
    const data = await redis.get(key);
    if (!data) continue;

    const { nodeId, timestamp } = JSON.parse(data);
    if (Date.now() - timestamp > 30_000) {
      deadNodes.push(nodeId);

      // Release all locks held by dead node
      const lockKeys = await redis.keys('scheduler:lock:*');
      for (const lockKey of lockKeys) {
        const owner = await redis.get(lockKey);
        if (owner === nodeId) {
          await redis.del(lockKey);
          console.log(`Released orphaned lock: ${lockKey} (was held by ${nodeId})`);
        }
      }

      // Mark running tasks as failed for retry
      const { Pool } = await import('pg');
      const db = new Pool({ connectionString: process.env.DATABASE_URL });
      await db.query(`
        UPDATE task_executions SET status = 'failed', error = 'Node crashed'
        WHERE node_id = $1 AND status = 'running'
      `, [nodeId]);

      await redis.del(key);
    }
  }

  return deadNodes;
}

async function getActiveTasks(): Promise<number> {
  return 0; // simplified
}

Step 3: Scheduler API

typescript
// src/api/scheduler.ts
import { Hono } from 'hono';
import { Pool } from 'pg';
import { Redis } from 'ioredis';

const app = new Hono();
const db = new Pool({ connectionString: process.env.DATABASE_URL });
const redis = new Redis(process.env.REDIS_URL!);

app.get('/v1/scheduler/status', async (c) => {
  const heartbeats = await redis.keys('scheduler:heartbeat:*');
  const nodes = [];
  for (const key of heartbeats) {
    const data = await redis.get(key);
    if (data) nodes.push(JSON.parse(data));
  }

  const { rows: [stats] } = await db.query(`
    SELECT
      COUNT(*) FILTER (WHERE status = 'completed' AND started_at > NOW() - INTERVAL '24 hours') as completed_24h,
      COUNT(*) FILTER (WHERE status = 'failed' AND started_at > NOW() - INTERVAL '24 hours') as failed_24h,
      COUNT(*) FILTER (WHERE status = 'running') as running
    FROM task_executions
  `);

  return c.json({ nodes, stats });
});

app.get('/v1/scheduler/tasks/:taskId/history', async (c) => {
  const taskId = c.req.param('taskId');
  const { rows } = await db.query(`
    SELECT * FROM task_executions WHERE task_id = $1 ORDER BY started_at DESC LIMIT 50
  `, [taskId]);
  return c.json({ executions: rows });
});

export default app;

Results

  • Duplicate invoices: zero (was 3-5/month causing double charges)
  • Missed tasks during deploys: zero — another node picks up within 30 seconds
  • The $12K double charge: impossible with exactly-once semantics
  • Node failure recovery: automatic — dead node detected in 30s, tasks reassigned
  • 50 scheduled tasks: all running reliably across 3-node cluster
  • Execution audit trail: every task run tracked with timing, status, and node ID
  • Deployment confidence: rolling restarts don't affect scheduled tasks