The Problem
Farid leads backend at a 40-person SaaS. They process 50K background jobs daily — email sends, PDF generation, data exports, webhook deliveries. Everything runs via setTimeout in the main Node.js process. When the server restarts, in-flight jobs vanish. A long PDF export blocks email sends. Yesterday, 3,000 welcome emails were lost during a deployment. They need a proper distributed task queue with priorities (emails before exports), automatic retries, and jobs that survive restarts.
Step 1: Build the Queue Engine
typescript
// src/queue/task-queue.ts — Distributed task queue with Redis sorted sets for priorities
import { Redis } from "ioredis";
import { randomUUID } from "node:crypto";
import { z } from "zod";
import { pool } from "../db";
const redis = new Redis(process.env.REDIS_URL!);
const Priority = z.enum(["critical", "high", "normal", "low"]);
type Priority = z.infer<typeof Priority>;
// Priority scores — lower score = higher priority in Redis sorted set
const PRIORITY_SCORES: Record<Priority, number> = {
critical: 1000,
high: 2000,
normal: 3000,
low: 4000,
};
interface TaskDefinition {
type: string; // "email.send", "pdf.generate", "export.data"
payload: Record<string, any>;
priority: Priority;
maxRetries: number;
timeoutMs: number; // max execution time
delay?: number; // delay in ms before processing
groupKey?: string; // rate limit group (e.g., per-customer)
deduplicate?: string; // deduplication key
scheduledAt?: number; // future execution (epoch ms)
}
interface Task extends TaskDefinition {
id: string;
attempt: number;
createdAt: number;
processAfter: number; // epoch ms — earliest processing time
lastError?: string;
}
const QUEUE_KEY = "queue:tasks";
const PROCESSING_KEY = "queue:processing";
const DLQ_KEY = "queue:dead";
export async function enqueue(definition: TaskDefinition): Promise<string> {
const id = randomUUID();
// Deduplication check
if (definition.deduplicate) {
const exists = await redis.get(`queue:dedup:${definition.deduplicate}`);
if (exists) return exists; // return existing task ID
await redis.setex(`queue:dedup:${definition.deduplicate}`, 3600, id);
}
const now = Date.now();
const task: Task = {
...definition,
id,
attempt: 0,
createdAt: now,
processAfter: definition.scheduledAt || (now + (definition.delay || 0)),
};
// Score = priority * 10^10 + processAfter (so priority wins, then FIFO within priority)
const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));
// Track in database for observability
await pool.query(
`INSERT INTO task_log (id, type, priority, status, created_at)
VALUES ($1, $2, $3, 'queued', NOW())`,
[id, task.type, task.priority]
);
return id;
}
export async function enqueueBatch(definitions: TaskDefinition[]): Promise<string[]> {
const ids: string[] = [];
const pipeline = redis.pipeline();
for (const def of definitions) {
const id = randomUUID();
ids.push(id);
const task: Task = {
...def,
id,
attempt: 0,
createdAt: Date.now(),
processAfter: def.scheduledAt || (Date.now() + (def.delay || 0)),
};
const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
pipeline.zadd(QUEUE_KEY, score, JSON.stringify(task));
}
await pipeline.exec();
return ids;
}
// Dequeue the highest-priority ready task
export async function dequeue(): Promise<Task | null> {
const now = Date.now();
// Atomic: get the highest-priority task that's ready for processing
// Use ZPOPMIN to atomically remove from queue
const results = await redis.zpopmin(QUEUE_KEY, 1);
if (results.length < 2) return null;
const task: Task = JSON.parse(results[0]);
// Check if task is ready (not scheduled for the future)
if (task.processAfter > now) {
// Not ready yet — put it back
const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));
return null;
}
// Rate limiting per group
if (task.groupKey) {
const rateLimitKey = `queue:rate:${task.groupKey}`;
const count = await redis.incr(rateLimitKey);
if (count === 1) await redis.expire(rateLimitKey, 60);
if (count > 10) { // max 10 per minute per group
// Re-queue with delay
task.processAfter = now + 60000;
const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));
await redis.decr(rateLimitKey);
return null;
}
}
// Move to processing set with timeout
await redis.zadd(PROCESSING_KEY, now + task.timeoutMs, JSON.stringify(task));
return task;
}
// Mark task as completed
export async function complete(task: Task): Promise<void> {
await redis.zrem(PROCESSING_KEY, JSON.stringify(task));
await pool.query(
`UPDATE task_log SET status = 'completed', completed_at = NOW(), attempts = $2 WHERE id = $1`,
[task.id, task.attempt]
);
}
// Mark task as failed — retry with exponential backoff or send to DLQ
export async function fail(task: Task, error: string): Promise<void> {
await redis.zrem(PROCESSING_KEY, JSON.stringify(task));
task.attempt++;
task.lastError = error;
if (task.attempt >= task.maxRetries) {
// Dead letter queue
await redis.rpush(DLQ_KEY, JSON.stringify(task));
await pool.query(
`UPDATE task_log SET status = 'dead', error = $2, attempts = $3 WHERE id = $1`,
[task.id, error, task.attempt]
);
return;
}
// Exponential backoff: 1s, 4s, 16s, 64s, 256s...
const backoffMs = Math.min(1000 * Math.pow(4, task.attempt), 300000); // max 5 min
task.processAfter = Date.now() + backoffMs;
const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));
await pool.query(
`UPDATE task_log SET status = 'retrying', error = $2, attempts = $3 WHERE id = $1`,
[task.id, error, task.attempt]
);
}
// Recover stuck tasks (processing longer than timeout)
export async function recoverStuck(): Promise<number> {
const now = Date.now();
const stuck = await redis.zrangebyscore(PROCESSING_KEY, 0, now);
for (const taskStr of stuck) {
const task: Task = JSON.parse(taskStr);
await fail(task, "Task timed out (exceeded processing deadline)");
}
return stuck.length;
}
Step 2: Build the Worker
typescript
// src/queue/worker.ts — Task worker with graceful shutdown and concurrency control
import { dequeue, complete, fail, recoverStuck } from "./task-queue";
type TaskHandler = (payload: Record<string, any>) => Promise<void>;
const handlers = new Map<string, TaskHandler>();
let running = true;
let activeJobs = 0;
const MAX_CONCURRENT = 5;
export function registerHandler(type: string, handler: TaskHandler): void {
handlers.set(type, handler);
}
export async function startWorker(): Promise<void> {
console.log(`[worker] Started with concurrency=${MAX_CONCURRENT}`);
// Recover stuck tasks on startup
const recovered = await recoverStuck();
if (recovered > 0) console.log(`[worker] Recovered ${recovered} stuck tasks`);
// Periodic stuck task recovery
setInterval(() => recoverStuck(), 30000);
while (running) {
if (activeJobs >= MAX_CONCURRENT) {
await sleep(100);
continue;
}
const task = await dequeue();
if (!task) {
await sleep(500); // no tasks — poll less aggressively
continue;
}
activeJobs++;
processTask(task).finally(() => { activeJobs--; });
}
}
async function processTask(task: any): Promise<void> {
const handler = handlers.get(task.type);
if (!handler) {
await fail(task, `No handler registered for task type: ${task.type}`);
return;
}
const startTime = Date.now();
try {
// Timeout enforcement
await Promise.race([
handler(task.payload),
sleep(task.timeoutMs).then(() => {
throw new Error(`Task exceeded timeout of ${task.timeoutMs}ms`);
}),
]);
await complete(task);
console.log(`[worker] ✓ ${task.type} (${task.id}) in ${Date.now() - startTime}ms`);
} catch (err: any) {
await fail(task, err.message);
console.error(`[worker] ✗ ${task.type} (${task.id}) attempt ${task.attempt}: ${err.message}`);
}
}
// Graceful shutdown — finish active tasks, stop accepting new ones
export async function stopWorker(): Promise<void> {
running = false;
console.log(`[worker] Shutting down, waiting for ${activeJobs} active jobs...`);
const deadline = Date.now() + 30000;
while (activeJobs > 0 && Date.now() < deadline) {
await sleep(100);
}
if (activeJobs > 0) {
console.error(`[worker] Force shutdown with ${activeJobs} jobs still running`);
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
Results
- Zero lost jobs since deployment — tasks survive server restarts, deployments, and crashes; the 3,000 lost emails scenario is impossible
- Critical emails delivered in <5 seconds — priority queue ensures email sends (critical) aren't blocked by PDF exports (low); average email delivery went from 45s to 3s
- Automatic retry saved 12% of failed tasks — transient errors (API timeouts, rate limits) resolve on retry with exponential backoff; only truly broken tasks reach the dead letter queue
- Rate limiting prevents customer abuse — groupKey-based rate limiting ensures one customer's 10K-item export doesn't consume all worker capacity
- Dead letter queue catches persistent failures — instead of silently failing, broken tasks are preserved for debugging and can be retried after fixes