[TERMINAL · SKILLS]
> mounting /skills...
> indexing 295 manifests...
> linking agents: claude · codex · gemini · cursor
> ready.
[░░░░░░░░░░░░░░░░░░░░░░░░░░░░] 0%
Terminal.skills
Use Cases/Build a Distributed Task Queue with Priorities

Build a Distributed Task Queue with Priorities

Build a Redis-backed distributed task queue with priority levels, retries with exponential backoff, dead letter queues, rate limiting, and real-time monitoring — replacing fragile cron jobs with reliable async processing.

#redis#caching#database#pub-sub#queues
Works with:claude-codeopenai-codexgemini-clicursor

Skills stack · 5 skills

Avg quality 93/100·All SAFE
>

typescript

v

Not yet scored
View skill
>

redis

v1.0.0

Build applications with Redis — caching, session storage, pub/sub, streams, rate limiting, leaderboards, and queues. Use when tasks involve in-memory data storage, real-time messaging, distributed locking, or performance optimization with caching layers.

93/100 quality
1.81× impact
SAFE
View skill
>

postgresql

v1.0.0

Assists with designing schemas, writing performant queries, managing indexes, and operating PostgreSQL databases. Use when working with JSONB, full-text search, window functions, CTEs, row-level security, replication, or performance tuning. Trigger words: postgresql, postgres, sql, database, jsonb, rls, window functions, cte.

87/100 quality
1.53× impact
SAFE
View skill
>

hono

v1.0.0

You are an expert in Hono, the ultrafast web framework for the edge. You help developers build APIs and web applications that run on Cloudflare Workers, Deno, Bun, Node.js, AWS Lambda, and Vercel Edge — with a tiny footprint (~14KB), middleware ecosystem, JSX support, RPC client, and Web Standards API compatibility that makes code truly portable across runtimes.

93/100 quality
3.00× impact
SAFE
View skill
>

zod

v1.0.0

You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.

100/100 quality
1.21× impact
SAFE
View skill
$

The Problem

Farid leads backend at a 40-person SaaS. They process 50K background jobs daily — email sends, PDF generation, data exports, webhook deliveries. Everything runs via setTimeout in the main Node.js process. When the server restarts, in-flight jobs vanish. A long PDF export blocks email sends. Yesterday, 3,000 welcome emails were lost during a deployment. They need a proper distributed task queue with priorities (emails before exports), automatic retries, and jobs that survive restarts.

Step 1: Build the Queue Engine

typescript
// src/queue/task-queue.ts — Distributed task queue with Redis sorted sets for priorities
import { Redis } from "ioredis";
import { randomUUID } from "node:crypto";
import { z } from "zod";
import { pool } from "../db";

const redis = new Redis(process.env.REDIS_URL!);

const Priority = z.enum(["critical", "high", "normal", "low"]);
type Priority = z.infer<typeof Priority>;

// Priority scores — lower score = higher priority in Redis sorted set
const PRIORITY_SCORES: Record<Priority, number> = {
  critical: 1000,
  high: 2000,
  normal: 3000,
  low: 4000,
};

interface TaskDefinition {
  type: string;                   // "email.send", "pdf.generate", "export.data"
  payload: Record<string, any>;
  priority: Priority;
  maxRetries: number;
  timeoutMs: number;              // max execution time
  delay?: number;                 // delay in ms before processing
  groupKey?: string;              // rate limit group (e.g., per-customer)
  deduplicate?: string;           // deduplication key
  scheduledAt?: number;           // future execution (epoch ms)
}

interface Task extends TaskDefinition {
  id: string;
  attempt: number;
  createdAt: number;
  processAfter: number;           // epoch ms — earliest processing time
  lastError?: string;
}

const QUEUE_KEY = "queue:tasks";
const PROCESSING_KEY = "queue:processing";
const DLQ_KEY = "queue:dead";

export async function enqueue(definition: TaskDefinition): Promise<string> {
  const id = randomUUID();

  // Deduplication check
  if (definition.deduplicate) {
    const exists = await redis.get(`queue:dedup:${definition.deduplicate}`);
    if (exists) return exists; // return existing task ID
    await redis.setex(`queue:dedup:${definition.deduplicate}`, 3600, id);
  }

  const now = Date.now();
  const task: Task = {
    ...definition,
    id,
    attempt: 0,
    createdAt: now,
    processAfter: definition.scheduledAt || (now + (definition.delay || 0)),
  };

  // Score = priority * 10^10 + processAfter (so priority wins, then FIFO within priority)
  const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;

  await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));

  // Track in database for observability
  await pool.query(
    `INSERT INTO task_log (id, type, priority, status, created_at)
     VALUES ($1, $2, $3, 'queued', NOW())`,
    [id, task.type, task.priority]
  );

  return id;
}

export async function enqueueBatch(definitions: TaskDefinition[]): Promise<string[]> {
  const ids: string[] = [];
  const pipeline = redis.pipeline();

  for (const def of definitions) {
    const id = randomUUID();
    ids.push(id);

    const task: Task = {
      ...def,
      id,
      attempt: 0,
      createdAt: Date.now(),
      processAfter: def.scheduledAt || (Date.now() + (def.delay || 0)),
    };

    const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
    pipeline.zadd(QUEUE_KEY, score, JSON.stringify(task));
  }

  await pipeline.exec();
  return ids;
}

// Dequeue the highest-priority ready task
export async function dequeue(): Promise<Task | null> {
  const now = Date.now();

  // Atomic: get the highest-priority task that's ready for processing
  // Use ZPOPMIN to atomically remove from queue
  const results = await redis.zpopmin(QUEUE_KEY, 1);

  if (results.length < 2) return null;

  const task: Task = JSON.parse(results[0]);

  // Check if task is ready (not scheduled for the future)
  if (task.processAfter > now) {
    // Not ready yet — put it back
    const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
    await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));
    return null;
  }

  // Rate limiting per group
  if (task.groupKey) {
    const rateLimitKey = `queue:rate:${task.groupKey}`;
    const count = await redis.incr(rateLimitKey);
    if (count === 1) await redis.expire(rateLimitKey, 60);

    if (count > 10) { // max 10 per minute per group
      // Re-queue with delay
      task.processAfter = now + 60000;
      const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
      await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));
      await redis.decr(rateLimitKey);
      return null;
    }
  }

  // Move to processing set with timeout
  await redis.zadd(PROCESSING_KEY, now + task.timeoutMs, JSON.stringify(task));

  return task;
}

// Mark task as completed
export async function complete(task: Task): Promise<void> {
  await redis.zrem(PROCESSING_KEY, JSON.stringify(task));

  await pool.query(
    `UPDATE task_log SET status = 'completed', completed_at = NOW(), attempts = $2 WHERE id = $1`,
    [task.id, task.attempt]
  );
}

// Mark task as failed — retry with exponential backoff or send to DLQ
export async function fail(task: Task, error: string): Promise<void> {
  await redis.zrem(PROCESSING_KEY, JSON.stringify(task));

  task.attempt++;
  task.lastError = error;

  if (task.attempt >= task.maxRetries) {
    // Dead letter queue
    await redis.rpush(DLQ_KEY, JSON.stringify(task));
    await pool.query(
      `UPDATE task_log SET status = 'dead', error = $2, attempts = $3 WHERE id = $1`,
      [task.id, error, task.attempt]
    );
    return;
  }

  // Exponential backoff: 1s, 4s, 16s, 64s, 256s...
  const backoffMs = Math.min(1000 * Math.pow(4, task.attempt), 300000); // max 5 min
  task.processAfter = Date.now() + backoffMs;

  const score = PRIORITY_SCORES[task.priority] * 1e10 + task.processAfter;
  await redis.zadd(QUEUE_KEY, score, JSON.stringify(task));

  await pool.query(
    `UPDATE task_log SET status = 'retrying', error = $2, attempts = $3 WHERE id = $1`,
    [task.id, error, task.attempt]
  );
}

// Recover stuck tasks (processing longer than timeout)
export async function recoverStuck(): Promise<number> {
  const now = Date.now();
  const stuck = await redis.zrangebyscore(PROCESSING_KEY, 0, now);

  for (const taskStr of stuck) {
    const task: Task = JSON.parse(taskStr);
    await fail(task, "Task timed out (exceeded processing deadline)");
  }

  return stuck.length;
}

Step 2: Build the Worker

typescript
// src/queue/worker.ts — Task worker with graceful shutdown and concurrency control
import { dequeue, complete, fail, recoverStuck } from "./task-queue";

type TaskHandler = (payload: Record<string, any>) => Promise<void>;

const handlers = new Map<string, TaskHandler>();
let running = true;
let activeJobs = 0;
const MAX_CONCURRENT = 5;

export function registerHandler(type: string, handler: TaskHandler): void {
  handlers.set(type, handler);
}

export async function startWorker(): Promise<void> {
  console.log(`[worker] Started with concurrency=${MAX_CONCURRENT}`);

  // Recover stuck tasks on startup
  const recovered = await recoverStuck();
  if (recovered > 0) console.log(`[worker] Recovered ${recovered} stuck tasks`);

  // Periodic stuck task recovery
  setInterval(() => recoverStuck(), 30000);

  while (running) {
    if (activeJobs >= MAX_CONCURRENT) {
      await sleep(100);
      continue;
    }

    const task = await dequeue();
    if (!task) {
      await sleep(500); // no tasks — poll less aggressively
      continue;
    }

    activeJobs++;
    processTask(task).finally(() => { activeJobs--; });
  }
}

async function processTask(task: any): Promise<void> {
  const handler = handlers.get(task.type);

  if (!handler) {
    await fail(task, `No handler registered for task type: ${task.type}`);
    return;
  }

  const startTime = Date.now();

  try {
    // Timeout enforcement
    await Promise.race([
      handler(task.payload),
      sleep(task.timeoutMs).then(() => {
        throw new Error(`Task exceeded timeout of ${task.timeoutMs}ms`);
      }),
    ]);

    await complete(task);
    console.log(`[worker] ✓ ${task.type} (${task.id}) in ${Date.now() - startTime}ms`);
  } catch (err: any) {
    await fail(task, err.message);
    console.error(`[worker] ✗ ${task.type} (${task.id}) attempt ${task.attempt}: ${err.message}`);
  }
}

// Graceful shutdown — finish active tasks, stop accepting new ones
export async function stopWorker(): Promise<void> {
  running = false;
  console.log(`[worker] Shutting down, waiting for ${activeJobs} active jobs...`);

  const deadline = Date.now() + 30000;
  while (activeJobs > 0 && Date.now() < deadline) {
    await sleep(100);
  }

  if (activeJobs > 0) {
    console.error(`[worker] Force shutdown with ${activeJobs} jobs still running`);
  }
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

Results

  • Zero lost jobs since deployment — tasks survive server restarts, deployments, and crashes; the 3,000 lost emails scenario is impossible
  • Critical emails delivered in <5 seconds — priority queue ensures email sends (critical) aren't blocked by PDF exports (low); average email delivery went from 45s to 3s
  • Automatic retry saved 12% of failed tasks — transient errors (API timeouts, rate limits) resolve on retry with exponential backoff; only truly broken tasks reach the dead letter queue
  • Rate limiting prevents customer abuse — groupKey-based rate limiting ensures one customer's 10K-item export doesn't consume all worker capacity
  • Dead letter queue catches persistent failures — instead of silently failing, broken tasks are preserved for debugging and can be retried after fixes