Terminal.skills
Use Cases/Build a Distributed Task Scheduler with Exactly-Once Execution

Build a Distributed Task Scheduler with Exactly-Once Execution

Build a task scheduler that guarantees exactly-once execution across a cluster — no duplicate invoice sends, no missed cron jobs, and automatic failover when nodes crash.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor
$

The Problem

A billing platform runs 50 scheduled tasks: invoice generation, payment retries, subscription renewals, usage reports, dunning emails. The cron jobs run on a single server. When the server restarts during a deployment, 3-5 tasks are missed. When they added a second server for redundancy, both servers ran the same tasks — customers received duplicate invoices and double charges. One customer was charged $12K twice for their annual plan.

Step 1: Distributed Lock-Based Scheduler

typescript
// src/scheduler/distributed-scheduler.ts
import { Redis } from 'ioredis';
import { Pool } from 'pg';
import { randomUUID } from 'crypto';

const redis = new Redis(process.env.REDIS_URL!);
const db = new Pool({ connectionString: process.env.DATABASE_URL });

const NODE_ID = `node-${randomUUID().slice(0, 8)}`;

interface ScheduledTask {
  id: string;
  name: string;
  cronExpression: string;
  handler: string;
  timeoutSeconds: number;
  retryOnFailure: boolean;
  maxRetries: number;
}

// Acquire a distributed lock before executing a task
async function acquireLock(taskId: string, ttlSeconds: number): Promise<boolean> {
  const lockKey = `scheduler:lock:${taskId}`;
  const result = await redis.set(lockKey, NODE_ID, 'NX', 'EX', ttlSeconds);
  return result === 'OK';
}

async function releaseLock(taskId: string): Promise<void> {
  // Only release if we own the lock (prevent releasing another node's lock)
  const lockKey = `scheduler:lock:${taskId}`;
  await redis.eval(`
    if redis.call('get', KEYS[1]) == ARGV[1] then
      return redis.call('del', KEYS[1])
    end
    return 0
  `, 1, lockKey, NODE_ID);
}

export async function executeTask(task: ScheduledTask): Promise<{
  executed: boolean;
  executionId: string | null;
  error?: string;
}> {
  const executionId = randomUUID();

  // Step 1: Acquire lock
  const locked = await acquireLock(task.id, task.timeoutSeconds + 30);
  if (!locked) {
    return { executed: false, executionId: null }; // Another node is handling it
  }

  try {
    // Step 2: Check if already executed (idempotency)
    const { rows } = await db.query(`
      SELECT id FROM task_executions
      WHERE task_id = $1 AND scheduled_for = $2 AND status IN ('completed', 'running')
    `, [task.id, getCurrentScheduledTime(task.cronExpression)]);

    if (rows.length > 0) {
      return { executed: false, executionId: null }; // Already executed
    }

    // Step 3: Record execution start
    await db.query(`
      INSERT INTO task_executions (id, task_id, node_id, scheduled_for, status, started_at)
      VALUES ($1, $2, $3, $4, 'running', NOW())
    `, [executionId, task.id, NODE_ID, getCurrentScheduledTime(task.cronExpression)]);

    // Step 4: Run the handler
    const handler = handlers[task.handler];
    if (!handler) throw new Error(`Unknown handler: ${task.handler}`);

    await handler();

    // Step 5: Mark completed
    await db.query(`
      UPDATE task_executions SET status = 'completed', completed_at = NOW()
      WHERE id = $1
    `, [executionId]);

    return { executed: true, executionId };
  } catch (err: any) {
    await db.query(`
      UPDATE task_executions SET status = 'failed', error = $1, completed_at = NOW()
      WHERE id = $2
    `, [err.message, executionId]);

    return { executed: true, executionId, error: err.message };
  } finally {
    await releaseLock(task.id);
  }
}

function getCurrentScheduledTime(cron: string): string {
  // Snap to the nearest scheduled time for idempotency
  const now = new Date();
  now.setSeconds(0, 0);
  return now.toISOString();
}

const handlers: Record<string, () => Promise<void>> = {};
export function registerHandler(name: string, fn: () => Promise<void>): void {
  handlers[name] = fn;
}

Step 2: Heartbeat and Failover

typescript
// src/scheduler/heartbeat.ts
import { Redis } from 'ioredis';

const redis = new Redis(process.env.REDIS_URL!);
const NODE_ID = process.env.NODE_ID!;

// Report heartbeat every 10 seconds
export async function startHeartbeat(): Promise<void> {
  setInterval(async () => {
    await redis.setex(`scheduler:heartbeat:${NODE_ID}`, 30, JSON.stringify({
      nodeId: NODE_ID,
      timestamp: Date.now(),
      activeTasks: await getActiveTasks(),
    }));
  }, 10_000);
}

// Detect dead nodes and reassign their tasks
export async function checkForDeadNodes(): Promise<string[]> {
  const keys = await redis.keys('scheduler:heartbeat:*');
  const deadNodes: string[] = [];

  for (const key of keys) {
    const data = await redis.get(key);
    if (!data) continue;

    const { nodeId, timestamp } = JSON.parse(data);
    if (Date.now() - timestamp > 30_000) {
      deadNodes.push(nodeId);

      // Release all locks held by dead node
      const lockKeys = await redis.keys('scheduler:lock:*');
      for (const lockKey of lockKeys) {
        const owner = await redis.get(lockKey);
        if (owner === nodeId) {
          await redis.del(lockKey);
          console.log(`Released orphaned lock: ${lockKey} (was held by ${nodeId})`);
        }
      }

      // Mark running tasks as failed for retry
      const { Pool } = await import('pg');
      const db = new Pool({ connectionString: process.env.DATABASE_URL });
      await db.query(`
        UPDATE task_executions SET status = 'failed', error = 'Node crashed'
        WHERE node_id = $1 AND status = 'running'
      `, [nodeId]);

      await redis.del(key);
    }
  }

  return deadNodes;
}

async function getActiveTasks(): Promise<number> {
  return 0; // simplified
}

Step 3: Scheduler API

typescript
// src/api/scheduler.ts
import { Hono } from 'hono';
import { Pool } from 'pg';
import { Redis } from 'ioredis';

const app = new Hono();
const db = new Pool({ connectionString: process.env.DATABASE_URL });
const redis = new Redis(process.env.REDIS_URL!);

app.get('/v1/scheduler/status', async (c) => {
  const heartbeats = await redis.keys('scheduler:heartbeat:*');
  const nodes = [];
  for (const key of heartbeats) {
    const data = await redis.get(key);
    if (data) nodes.push(JSON.parse(data));
  }

  const { rows: [stats] } = await db.query(`
    SELECT
      COUNT(*) FILTER (WHERE status = 'completed' AND started_at > NOW() - INTERVAL '24 hours') as completed_24h,
      COUNT(*) FILTER (WHERE status = 'failed' AND started_at > NOW() - INTERVAL '24 hours') as failed_24h,
      COUNT(*) FILTER (WHERE status = 'running') as running
    FROM task_executions
  `);

  return c.json({ nodes, stats });
});

app.get('/v1/scheduler/tasks/:taskId/history', async (c) => {
  const taskId = c.req.param('taskId');
  const { rows } = await db.query(`
    SELECT * FROM task_executions WHERE task_id = $1 ORDER BY started_at DESC LIMIT 50
  `, [taskId]);
  return c.json({ executions: rows });
});

export default app;

Results

  • Duplicate invoices: zero (was 3-5/month causing double charges)
  • Missed tasks during deploys: zero — another node picks up within 30 seconds
  • The $12K double charge: impossible with exactly-once semantics
  • Node failure recovery: automatic — dead node detected in 30s, tasks reassigned
  • 50 scheduled tasks: all running reliably across 3-node cluster
  • Execution audit trail: every task run tracked with timing, status, and node ID
  • Deployment confidence: rolling restarts don't affect scheduled tasks