The Problem
Kai leads backend at a 15-person SaaS with 200+ database tables. They run migrations manually via SQL files pasted into pgAdmin. Nobody tracks which migrations ran on which environment. Two developers ran conflicting ALTER TABLEs on the same table — one added a column, the other renamed it — crashing production for 45 minutes. Rollbacks require reverse-engineering what the migration did. They need automated migrations: versioned, ordered, with rollback support, dry-run mode, and locking to prevent concurrent execution.
Step 1: Build the Migration Runner
typescript
// src/migrations/runner.ts — Database migration runner with rollback and concurrency control
import { pool } from "../db";
import { readdir, readFile } from "node:fs/promises";
import { join } from "node:path";
import { createHash } from "node:crypto";
interface Migration {
version: string; // e.g., "20260315_001"
name: string; // e.g., "add_users_email_index"
up: string; // SQL to apply
down: string; // SQL to rollback
checksum: string; // SHA-256 of up SQL
appliedAt?: string;
executionTimeMs?: number;
}
interface MigrationResult {
version: string;
name: string;
status: "applied" | "rolled_back" | "skipped" | "failed";
executionTimeMs: number;
error?: string;
}
const MIGRATIONS_DIR = process.env.MIGRATIONS_DIR || "./migrations";
// Initialize migration tracking table
export async function initialize(): Promise<void> {
await pool.query(`
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(50) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP NOT NULL DEFAULT NOW(),
execution_time_ms INTEGER,
rolled_back_at TIMESTAMP
);
CREATE TABLE IF NOT EXISTS migration_locks (
id INTEGER PRIMARY KEY DEFAULT 1,
locked_by VARCHAR(255),
locked_at TIMESTAMP,
CONSTRAINT single_lock CHECK (id = 1)
);
INSERT INTO migration_locks (id) VALUES (1) ON CONFLICT DO NOTHING;
`);
}
// Run pending migrations
export async function migrate(options?: {
dryRun?: boolean;
target?: string; // migrate up to this version
}): Promise<MigrationResult[]> {
await initialize();
// Acquire lock to prevent concurrent migrations
const lockAcquired = await acquireLock();
if (!lockAcquired) throw new Error("Another migration is already running");
try {
const allMigrations = await loadMigrations();
const applied = await getAppliedVersions();
const pending = allMigrations.filter((m) => !applied.has(m.version));
// Check for modified migrations (checksum mismatch)
for (const m of allMigrations) {
if (applied.has(m.version)) {
const { rows: [record] } = await pool.query(
"SELECT checksum FROM schema_migrations WHERE version = $1 AND rolled_back_at IS NULL",
[m.version]
);
if (record && record.checksum !== m.checksum) {
throw new Error(`Migration ${m.version} (${m.name}) has been modified after being applied. Checksum mismatch.`);
}
}
}
if (pending.length === 0) {
console.log("No pending migrations");
return [];
}
const results: MigrationResult[] = [];
for (const migration of pending) {
if (options?.target && migration.version > options.target) break;
if (options?.dryRun) {
console.log(`[DRY RUN] Would apply: ${migration.version} — ${migration.name}`);
console.log(migration.up);
results.push({ version: migration.version, name: migration.name, status: "skipped", executionTimeMs: 0 });
continue;
}
const start = Date.now();
try {
await pool.query("BEGIN");
await pool.query(migration.up);
await pool.query(
`INSERT INTO schema_migrations (version, name, checksum, execution_time_ms) VALUES ($1, $2, $3, $4)`,
[migration.version, migration.name, migration.checksum, Date.now() - start]
);
await pool.query("COMMIT");
const ms = Date.now() - start;
console.log(`Applied: ${migration.version} — ${migration.name} (${ms}ms)`);
results.push({ version: migration.version, name: migration.name, status: "applied", executionTimeMs: ms });
} catch (error: any) {
await pool.query("ROLLBACK");
results.push({
version: migration.version, name: migration.name,
status: "failed", executionTimeMs: Date.now() - start,
error: error.message,
});
throw new Error(`Migration ${migration.version} failed: ${error.message}`);
}
}
return results;
} finally {
await releaseLock();
}
}
// Rollback last N migrations
export async function rollback(count: number = 1): Promise<MigrationResult[]> {
await initialize();
const lockAcquired = await acquireLock();
if (!lockAcquired) throw new Error("Another migration is already running");
try {
const { rows } = await pool.query(
`SELECT version, name FROM schema_migrations
WHERE rolled_back_at IS NULL ORDER BY version DESC LIMIT $1`,
[count]
);
const allMigrations = await loadMigrations();
const results: MigrationResult[] = [];
for (const record of rows) {
const migration = allMigrations.find((m) => m.version === record.version);
if (!migration) {
console.warn(`Migration file not found for ${record.version}`);
continue;
}
const start = Date.now();
try {
await pool.query("BEGIN");
await pool.query(migration.down);
await pool.query(
"UPDATE schema_migrations SET rolled_back_at = NOW() WHERE version = $1",
[migration.version]
);
await pool.query("COMMIT");
const ms = Date.now() - start;
console.log(`Rolled back: ${migration.version} — ${migration.name} (${ms}ms)`);
results.push({ version: migration.version, name: migration.name, status: "rolled_back", executionTimeMs: ms });
} catch (error: any) {
await pool.query("ROLLBACK");
results.push({ version: migration.version, name: migration.name, status: "failed", executionTimeMs: Date.now() - start, error: error.message });
throw new Error(`Rollback ${migration.version} failed: ${error.message}`);
}
}
return results;
} finally {
await releaseLock();
}
}
// Migration status report
export async function status(): Promise<{
applied: number;
pending: number;
lastApplied: string | null;
migrations: Array<{ version: string; name: string; status: string; appliedAt: string | null }>;
}> {
await initialize();
const allMigrations = await loadMigrations();
const applied = await getAppliedVersions();
const migrations = allMigrations.map((m) => ({
version: m.version, name: m.name,
status: applied.has(m.version) ? "applied" : "pending",
appliedAt: null as string | null,
}));
const { rows } = await pool.query(
"SELECT version, applied_at FROM schema_migrations WHERE rolled_back_at IS NULL"
);
for (const row of rows) {
const m = migrations.find((m) => m.version === row.version);
if (m) m.appliedAt = row.applied_at;
}
const pendingCount = migrations.filter((m) => m.status === "pending").length;
const lastApplied = rows.length > 0 ? rows.sort((a: any, b: any) => b.version.localeCompare(a.version))[0].version : null;
return { applied: applied.size, pending: pendingCount, lastApplied, migrations };
}
// Load migration files from disk
async function loadMigrations(): Promise<Migration[]> {
const files = await readdir(MIGRATIONS_DIR);
const migrations: Migration[] = [];
for (const file of files.filter((f) => f.endsWith(".sql")).sort()) {
const content = await readFile(join(MIGRATIONS_DIR, file), "utf-8");
const [version, ...nameParts] = file.replace(".sql", "").split("_");
const name = nameParts.join("_");
// Split UP and DOWN sections
const upMatch = content.match(/--\s*UP\s*\n([\s\S]*?)(?=--\s*DOWN|$)/);
const downMatch = content.match(/--\s*DOWN\s*\n([\s\S]*)$/);
migrations.push({
version: file.replace(".sql", ""),
name,
up: upMatch?.[1]?.trim() || content.trim(),
down: downMatch?.[1]?.trim() || "",
checksum: createHash("sha256").update(upMatch?.[1] || content).digest("hex").slice(0, 16),
});
}
return migrations.sort((a, b) => a.version.localeCompare(b.version));
}
async function getAppliedVersions(): Promise<Set<string>> {
const { rows } = await pool.query(
"SELECT version FROM schema_migrations WHERE rolled_back_at IS NULL"
);
return new Set(rows.map((r: any) => r.version));
}
// Advisory lock to prevent concurrent migrations
async function acquireLock(): Promise<boolean> {
const { rows: [result] } = await pool.query(
`UPDATE migration_locks SET locked_by = $1, locked_at = NOW()
WHERE id = 1 AND (locked_by IS NULL OR locked_at < NOW() - INTERVAL '10 minutes')
RETURNING id`,
[process.env.HOSTNAME || "unknown"]
);
return !!result;
}
async function releaseLock(): Promise<void> {
await pool.query("UPDATE migration_locks SET locked_by = NULL, locked_at = NULL WHERE id = 1");
}
Results
- Ordered execution guaranteed — migrations run in version order; no more conflicting ALTER TABLEs; concurrent protection via advisory lock
- Instant rollback —
rollback(1)reverses the last migration in seconds; 45-minute outage scenario eliminated; tested in staging first with dry-run - Checksum protection — if someone modifies an already-applied migration, the runner throws an error; prevents silent schema drift between environments
- Environment parity — same migration runner in dev, staging, production;
status()shows which migrations applied where; no more "works on my machine" - CI/CD integration — migrations run automatically during deploy; lock prevents two deploy pods from running simultaneously;
dryRunmode validates SQL syntax before apply