Terminal.skills
Use Cases/Build a Distributed Task Queue with Priorities

Build a Distributed Task Queue with Priorities

Build a Redis-backed distributed task queue with priority levels, retries with exponential backoff, dead letter queues, rate limiting, and real-time monitoring — replacing fragile cron jobs with reliable async processing.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor
$

The Problem

Farid leads backend at a 40-person SaaS. They process 50K background jobs daily — email sends, PDF generation, data exports, webhook deliveries. Everything runs via setTimeout in the main Node.js process. When the server restarts, in-flight jobs vanish. A long PDF export blocks email sends. Yesterday, 3,000 welcome emails were lost during a deployment. They need a proper distributed task queue with priorities (emails before exports), automatic retries, and jobs that survive restarts.

Step 1: Build the Queue Engine

typescript
// src/queue/task-queue.ts — Distributed task queue with Redis sorted sets for priorities
import { Redis } from "ioredis";
import { randomUUID } from "node:crypto";
import { z } from "zod";
import { pool } from "../db";

const redis = new Redis(process.env.REDIS_URL!);

const Priority = z.enum(["critical", "high", "normal", "low"]);
type Priority = z.infer<typeof Priority>;

// Priority scores — lower score = higher priority in Redis sorted set
const PRIORITY_SCORES: Record<Priority, number> = {
  critical: 1000,
  high: 2000,
  normal: 3000,
  low: 4000,
};

interface TaskDefinition {
  type: string;                   // "email.send", "pdf.generate", "export.data"
  payload: Record<string, any>;
  priority: Priority;
  maxRetries: number;
  timeoutMs: number;              // max execution time
  delay?: number;                 // delay in ms before processing
  groupKey?: string;              // rate limit group (e.g., per-customer)
  deduplicate?: string;           // deduplication key
  scheduledAt?: number;           // future execution (epoch ms)
}

interface Task extends TaskDefinition {
  id: string;
  attempt: number;
  createdAt: number;
  processAfter: number;           // epoch ms — earliest processing time
  lastError?: string;
}

const QUEUE_KEY = "queue:tasks";
const PROCESSING_KEY = "queue:processing";
const DLQ_KEY = "queue:dead";

export async function enqueue(definition: TaskDefinition): Promise<string> {
  const id = randomUUID();

  // Deduplication check
  if (definition.deduplicate) {
    const exists = await redis.get(`queue:dedup:${definition.deduplicate}`);
    if (exists) return exists; // return existing task ID
    await redis.setex(`queue:dedup:${definition.deduplicate}`, 3600, id);
  }

  const now = Date.now();
  const task: Task = {
    ...definition,
    id,
    attempt: 0,
    createdAt: now,
    processAfter: definition.scheduledAt || (now + (definition.delay || 0)),
  };

  // Score = priority * 10^10 + processAfter (so priority wins, then FIFO within priority)
  const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;

  await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));

  // Track in database for observability
  await pool.query(
    `INSERT INTO task_log (id, type, priority, status, created_at)
     VALUES ($1, $2, $3, 'queued', NOW())`,
    [id, task.type, task.priority]
  );

  return id;
}

export async function enqueueBatch(definitions: TaskDefinition[]): Promise<string[]> {
  const ids: string[] = [];
  const pipeline = redis.pipeline();

  for (const def of definitions) {
    const id = randomUUID();
    ids.push(id);

    const task: Task = {
      ...def,
      id,
      attempt: 0,
      createdAt: Date.now(),
      processAfter: def.scheduledAt || (Date.now() + (def.delay || 0)),
    };

    const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
    pipeline.zadd(QUEUE_KEY, score, JSON.stringify(task));
  }

  await pipeline.exec();
  return ids;
}

// Dequeue the highest-priority ready task
export async function dequeue(): Promise<Task | null> {
  const now = Date.now();

  // Atomic: get the highest-priority task that's ready for processing
  // Use ZPOPMIN to atomically remove from queue
  const results = await redis.zpopmin(QUEUE_KEY, 1);

  if (results.length < 2) return null;

  const task: Task = JSON.parse(results[0]);

  // Check if task is ready (not scheduled for the future)
  if (task.processAfter > now) {
    // Not ready yet — put it back
    const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
    await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));
    return null;
  }

  // Rate limiting per group
  if (task.groupKey) {
    const rateLimitKey = `queue:rate:${task.groupKey}`;
    const count = await redis.incr(rateLimitKey);
    if (count === 1) await redis.expire(rateLimitKey, 60);

    if (count > 10) { // max 10 per minute per group
      // Re-queue with delay
      task.processAfter = now + 60000;
      const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
      await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));
      await redis.decr(rateLimitKey);
      return null;
    }
  }

  // Move to processing set with timeout
  await redis.zadd(PROCESSING_KEY, now + task.timeoutMs, JSON.stringify(task));

  return task;
}

// Mark task as completed
export async function complete(task: Task): Promise<void> {
  await redis.zrem(PROCESSING_KEY, JSON.stringify(task));

  await pool.query(
    `UPDATE task_log SET status = 'completed', completed_at = NOW(), attempts = $2 WHERE id = $1`,
    [task.id, task.attempt]
  );
}

// Mark task as failed — retry with exponential backoff or send to DLQ
export async function fail(task: Task, error: string): Promise<void> {
  await redis.zrem(PROCESSING_KEY, JSON.stringify(task));

  task.attempt++;
  task.lastError = error;

  if (task.attempt >= task.maxRetries) {
    // Dead letter queue
    await redis.rpush(DLQ_KEY, JSON.stringify(task));
    await pool.query(
      `UPDATE task_log SET status = 'dead', error = $2, attempts = $3 WHERE id = $1`,
      [task.id, error, task.attempt]
    );
    return;
  }

  // Exponential backoff: 1s, 4s, 16s, 64s, 256s...
  const backoffMs = Math.min(1000 * Math.pow(4, task.attempt), 300000); // max 5 min
  task.processAfter = Date.now() + backoffMs;

  const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
  await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));

  await pool.query(
    `UPDATE task_log SET status = 'retrying', error = $2, attempts = $3 WHERE id = $1`,
    [task.id, error, task.attempt]
  );
}

// Recover stuck tasks (processing longer than timeout)
export async function recoverStuck(): Promise<number> {
  const now = Date.now();
  const stuck = await redis.zrangebyscore(PROCESSING_KEY, 0, now);

  for (const taskStr of stuck) {
    const task: Task = JSON.parse(taskStr);
    await fail(task, "Task timed out (exceeded processing deadline)");
  }

  return stuck.length;
}

Step 2: Build the Worker

typescript
// src/queue/worker.ts — Task worker with graceful shutdown and concurrency control
import { dequeue, complete, fail, recoverStuck } from "./task-queue";

type TaskHandler = (payload: Record<string, any>) => Promise<void>;

const handlers = new Map<string, TaskHandler>();
let running = true;
let activeJobs = 0;
const MAX_CONCURRENT = 5;

export function registerHandler(type: string, handler: TaskHandler): void {
  handlers.set(type, handler);
}

export async function startWorker(): Promise<void> {
  console.log(`[worker] Started with concurrency=${MAX_CONCURRENT}`);

  // Recover stuck tasks on startup
  const recovered = await recoverStuck();
  if (recovered > 0) console.log(`[worker] Recovered ${recovered} stuck tasks`);

  // Periodic stuck task recovery
  setInterval(() => recoverStuck(), 30000);

  while (running) {
    if (activeJobs >= MAX_CONCURRENT) {
      await sleep(100);
      continue;
    }

    const task = await dequeue();
    if (!task) {
      await sleep(500); // no tasks — poll less aggressively
      continue;
    }

    activeJobs++;
    processTask(task).finally(() => { activeJobs--; });
  }
}

async function processTask(task: any): Promise<void> {
  const handler = handlers.get(task.type);

  if (!handler) {
    await fail(task, `No handler registered for task type: ${task.type}`);
    return;
  }

  const startTime = Date.now();

  try {
    // Timeout enforcement
    await Promise.race([
      handler(task.payload),
      sleep(task.timeoutMs).then(() => {
        throw new Error(`Task exceeded timeout of ${task.timeoutMs}ms`);
      }),
    ]);

    await complete(task);
    console.log(`[worker] ✓ ${task.type} (${task.id}) in ${Date.now() - startTime}ms`);
  } catch (err: any) {
    await fail(task, err.message);
    console.error(`[worker] ✗ ${task.type} (${task.id}) attempt ${task.attempt}: ${err.message}`);
  }
}

// Graceful shutdown — finish active tasks, stop accepting new ones
export async function stopWorker(): Promise<void> {
  running = false;
  console.log(`[worker] Shutting down, waiting for ${activeJobs} active jobs...`);

  const deadline = Date.now() + 30000;
  while (activeJobs > 0 && Date.now() < deadline) {
    await sleep(100);
  }

  if (activeJobs > 0) {
    console.error(`[worker] Force shutdown with ${activeJobs} jobs still running`);
  }
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

Results

  • Zero lost jobs since deployment — tasks survive server restarts, deployments, and crashes; the 3,000 lost emails scenario is impossible
  • Critical emails delivered in <5 seconds — priority queue ensures email sends (critical) aren't blocked by PDF exports (low); average email delivery went from 45s to 3s
  • Automatic retry saved 12% of failed tasks — transient errors (API timeouts, rate limits) resolve on retry with exponential backoff; only truly broken tasks reach the dead letter queue
  • Rate limiting prevents customer abuse — groupKey-based rate limiting ensures one customer's 10K-item export doesn't consume all worker capacity
  • Dead letter queue catches persistent failures — instead of silently failing, broken tasks are preserved for debugging and can be retried after fixes