Terminal.skills
Use Cases/Build an Event-Sourced Fintech Ledger with CQRS

Build an Event-Sourced Fintech Ledger with CQRS

Replace a mutable-state accounting system with an append-only event store, separate read/write models, and a complete audit trail that satisfies financial regulators.

#postgresql#database#sql#relational#jsonb
Works with:claude-codeopenai-codexgemini-clicursor
$

The Problem

Ravi runs backend at a payments startup processing 50K transactions per day. Their current system uses a single PostgreSQL table with mutable balances — when something goes wrong, nobody can explain how a balance reached its current value. Last month a reconciliation discrepancy cost them $23K in manual investigation time, and their auditor flagged the lack of immutable transaction history as a compliance blocker for their money transmitter license.

Ravi needs:

  • An append-only event store where every state change is recorded permanently
  • Separate read and write models (CQRS) so queries don't block writes during peak load
  • Deterministic replay — rebuild any account's balance from its event history
  • Regulatory audit trail with tamper-evident checksums
  • Zero data loss during the migration from the legacy mutable system

Step 1: Design the Event Schema

Every financial event is an immutable fact. Design events as the single source of truth, not database rows.

typescript
// src/events/account-events.ts
// Defines all possible state transitions for an account

import { z } from 'zod';

export const AccountCreatedEvent = z.object({
  type: z.literal('AccountCreated'),
  aggregateId: z.string().uuid(),
  data: z.object({
    ownerId: z.string().uuid(),
    currency: z.enum(['USD', 'EUR', 'GBP']),
    accountType: z.enum(['checking', 'savings', 'escrow']),
  }),
  metadata: z.object({
    correlationId: z.string().uuid(),
    causationId: z.string().uuid(),
    timestamp: z.string().datetime(),
    version: z.number().int().positive(),
    // SHA-256 of previous event — tamper-evident chain
    previousHash: z.string(),
  }),
});

export const FundsDepositedEvent = z.object({
  type: z.literal('FundsDeposited'),
  aggregateId: z.string().uuid(),
  data: z.object({
    amount: z.number().int().positive(),  // cents, never floats
    reference: z.string(),
    source: z.enum(['ach', 'wire', 'internal', 'card']),
  }),
  metadata: z.object({
    correlationId: z.string().uuid(),
    causationId: z.string().uuid(),
    timestamp: z.string().datetime(),
    version: z.number().int().positive(),
    previousHash: z.string(),
  }),
});

export const FundsWithdrawnEvent = z.object({
  type: z.literal('FundsWithdrawn'),
  aggregateId: z.string().uuid(),
  data: z.object({
    amount: z.number().int().positive(),
    reference: z.string(),
    destination: z.enum(['ach', 'wire', 'internal', 'card']),
  }),
  metadata: z.object({
    correlationId: z.string().uuid(),
    causationId: z.string().uuid(),
    timestamp: z.string().datetime(),
    version: z.number().int().positive(),
    previousHash: z.string(),
  }),
});

export const FundsHeldEvent = z.object({
  type: z.literal('FundsHeld'),
  aggregateId: z.string().uuid(),
  data: z.object({
    amount: z.number().int().positive(),
    holdId: z.string().uuid(),
    reason: z.string(),
    expiresAt: z.string().datetime(),
  }),
  metadata: z.object({
    correlationId: z.string().uuid(),
    causationId: z.string().uuid(),
    timestamp: z.string().datetime(),
    version: z.number().int().positive(),
    previousHash: z.string(),
  }),
});

export type AccountEvent =
  | z.infer<typeof AccountCreatedEvent>
  | z.infer<typeof FundsDepositedEvent>
  | z.infer<typeof FundsWithdrawnEvent>
  | z.infer<typeof FundsHeldEvent>;

Key design decisions: amounts in cents (integer arithmetic avoids floating-point bugs that cause real money loss), every event carries a previousHash for tamper detection, and correlationId/causationId trace the full causal chain across services.

Step 2: Build the Event Store

The event store is an append-only PostgreSQL table with optimistic concurrency control.

sql
-- migrations/001_event_store.sql
-- Append-only event store with hash chain for tamper detection

CREATE TABLE events (
  id            BIGSERIAL PRIMARY KEY,
  aggregate_id  UUID NOT NULL,
  aggregate_type VARCHAR(50) NOT NULL DEFAULT 'Account',
  version       INTEGER NOT NULL,
  type          VARCHAR(100) NOT NULL,
  data          JSONB NOT NULL,
  metadata      JSONB NOT NULL,
  hash          CHAR(64) NOT NULL,        -- SHA-256 of this event
  previous_hash CHAR(64) NOT NULL,        -- SHA-256 of prior event
  created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  
  -- Optimistic concurrency: no two events for same aggregate + version
  UNIQUE (aggregate_id, version)
);

-- Fast aggregate replay
CREATE INDEX idx_events_aggregate ON events (aggregate_id, version);

-- Time-range queries for auditors
CREATE INDEX idx_events_created ON events (created_at);

-- Event type filtering for projections
CREATE INDEX idx_events_type ON events (type);

-- Prevent any updates or deletes at the database level
CREATE RULE no_update_events AS ON UPDATE TO events DO INSTEAD NOTHING;
CREATE RULE no_delete_events AS ON DELETE TO events DO INSTEAD NOTHING;

The UNIQUE (aggregate_id, version) constraint is the core of optimistic concurrency — if two processes try to append version 5 simultaneously, one gets a unique violation and must retry. The no_update / no_delete rules make the table genuinely append-only at the database level.

typescript
// src/store/event-store.ts
// Appends events with hash chain verification and optimistic concurrency

import { createHash } from 'crypto';
import { Pool } from 'pg';
import type { AccountEvent } from '../events/account-events';

export class EventStore {
  constructor(private pool: Pool) {}

  async append(events: AccountEvent[]): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');

      for (const event of events) {
        const hash = this.computeHash(event);

        await client.query(
          `INSERT INTO events (aggregate_id, version, type, data, metadata, hash, previous_hash)
           VALUES ($1, $2, $3, $4, $5, $6, $7)`,
          [
            event.aggregateId,
            event.metadata.version,
            event.type,
            JSON.stringify(event.data),
            JSON.stringify(event.metadata),
            hash,
            event.metadata.previousHash,
          ]
        );
      }

      await client.query('COMMIT');
    } catch (err: any) {
      await client.query('ROLLBACK');
      // Unique violation = optimistic concurrency conflict
      if (err.code === '23505') {
        throw new ConcurrencyError(
          `Version conflict for aggregate ${events[0]?.aggregateId}`
        );
      }
      throw err;
    } finally {
      client.release();
    }
  }

  async getEvents(aggregateId: string): Promise<AccountEvent[]> {
    const result = await this.pool.query(
      `SELECT type, aggregate_id, data, metadata
       FROM events
       WHERE aggregate_id = $1
       ORDER BY version ASC`,
      [aggregateId]
    );

    return result.rows.map((row) => ({
      type: row.type,
      aggregateId: row.aggregate_id,
      data: row.data,
      metadata: row.metadata,
    })) as AccountEvent[];
  }

  // Verify no events were tampered with
  async verifyChain(aggregateId: string): Promise<boolean> {
    const result = await this.pool.query(
      `SELECT hash, previous_hash, type, data, metadata
       FROM events
       WHERE aggregate_id = $1
       ORDER BY version ASC`,
      [aggregateId]
    );

    for (let i = 1; i < result.rows.length; i++) {
      if (result.rows[i].previous_hash !== result.rows[i - 1].hash) {
        return false;  // Chain broken — tampering detected
      }
    }
    return true;
  }

  private computeHash(event: AccountEvent): string {
    const payload = JSON.stringify({
      type: event.type,
      aggregateId: event.aggregateId,
      data: event.data,
      metadata: event.metadata,
    });
    return createHash('sha256').update(payload).digest('hex');
  }
}

export class ConcurrencyError extends Error {
  constructor(message: string) {
    super(message);
    this.name = 'ConcurrencyError';
  }
}

Step 3: Build the Account Aggregate

The aggregate enforces business rules by replaying events to reconstruct current state, then validating commands against that state.

typescript
// src/aggregates/account.ts
// Reconstructs account state from events and enforces invariants

import type { AccountEvent } from '../events/account-events';

interface AccountState {
  id: string;
  ownerId: string;
  currency: string;
  balance: number;      // cents
  heldAmount: number;   // cents currently on hold
  version: number;
  lastHash: string;
  status: 'active' | 'frozen' | 'closed';
}

export class Account {
  private state: AccountState;

  private constructor(state: AccountState) {
    this.state = state;
  }

  // Replay events to rebuild current state — this is the "sourcing" in event sourcing
  static fromEvents(events: AccountEvent[]): Account {
    const initial: AccountState = {
      id: '',
      ownerId: '',
      currency: 'USD',
      balance: 0,
      heldAmount: 0,
      version: 0,
      lastHash: '0'.repeat(64),  // genesis hash
      status: 'active',
    };

    const state = events.reduce((s, event) => {
      switch (event.type) {
        case 'AccountCreated':
          return {
            ...s,
            id: event.aggregateId,
            ownerId: event.data.ownerId,
            currency: event.data.currency,
            version: event.metadata.version,
            lastHash: event.metadata.previousHash,
          };

        case 'FundsDeposited':
          return {
            ...s,
            balance: s.balance + event.data.amount,
            version: event.metadata.version,
            lastHash: event.metadata.previousHash,
          };

        case 'FundsWithdrawn':
          return {
            ...s,
            balance: s.balance - event.data.amount,
            version: event.metadata.version,
            lastHash: event.metadata.previousHash,
          };

        case 'FundsHeld':
          return {
            ...s,
            heldAmount: s.heldAmount + event.data.amount,
            version: event.metadata.version,
            lastHash: event.metadata.previousHash,
          };

        default:
          return s;
      }
    }, initial);

    return new Account(state);
  }

  get availableBalance(): number {
    return this.state.balance - this.state.heldAmount;
  }

  get version(): number {
    return this.state.version;
  }

  get lastHash(): string {
    return this.state.lastHash;
  }

  // Command: withdraw funds. Returns event if valid, throws if not.
  withdraw(amount: number, reference: string, destination: string): AccountEvent {
    if (amount <= 0) throw new Error('Amount must be positive');
    if (this.state.status !== 'active') throw new Error('Account is not active');
    if (amount > this.availableBalance) {
      throw new InsufficientFundsError(
        `Requested ${amount}, available ${this.availableBalance}`
      );
    }

    return {
      type: 'FundsWithdrawn',
      aggregateId: this.state.id,
      data: { amount, reference, destination: destination as any },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: crypto.randomUUID(),
        timestamp: new Date().toISOString(),
        version: this.state.version + 1,
        previousHash: this.state.lastHash,
      },
    };
  }

  deposit(amount: number, reference: string, source: string): AccountEvent {
    if (amount <= 0) throw new Error('Amount must be positive');
    if (this.state.status !== 'active') throw new Error('Account is not active');

    return {
      type: 'FundsDeposited',
      aggregateId: this.state.id,
      data: { amount, reference, source: source as any },
      metadata: {
        correlationId: crypto.randomUUID(),
        causationId: crypto.randomUUID(),
        timestamp: new Date().toISOString(),
        version: this.state.version + 1,
        previousHash: this.state.lastHash,
      },
    };
  }
}

export class InsufficientFundsError extends Error {
  constructor(message: string) {
    super(message);
    this.name = 'InsufficientFundsError';
  }
}

Step 4: Build Read Projections (the "Q" in CQRS)

Write side is the event store. Read side is purpose-built materialized views updated asynchronously via Kafka.

typescript
// src/projections/balance-projection.ts
// Maintains denormalized read model for fast balance lookups

import { Pool } from 'pg';
import type { AccountEvent } from '../events/account-events';

export class BalanceProjection {
  constructor(private readDb: Pool) {}

  async handle(event: AccountEvent): Promise<void> {
    switch (event.type) {
      case 'AccountCreated':
        await this.readDb.query(
          `INSERT INTO account_balances (id, owner_id, currency, balance, held_amount, status, updated_at)
           VALUES ($1, $2, $3, 0, 0, 'active', NOW())`,
          [event.aggregateId, event.data.ownerId, event.data.currency]
        );
        break;

      case 'FundsDeposited':
        await this.readDb.query(
          `UPDATE account_balances
           SET balance = balance + $1, updated_at = NOW()
           WHERE id = $2`,
          [event.data.amount, event.aggregateId]
        );
        break;

      case 'FundsWithdrawn':
        await this.readDb.query(
          `UPDATE account_balances
           SET balance = balance - $1, updated_at = NOW()
           WHERE id = $2`,
          [event.data.amount, event.aggregateId]
        );
        break;

      case 'FundsHeld':
        await this.readDb.query(
          `UPDATE account_balances
           SET held_amount = held_amount + $1, updated_at = NOW()
           WHERE id = $2`,
          [event.data.amount, event.aggregateId]
        );
        break;
    }
  }

  // Rebuild entire projection from scratch — used after schema changes or bugs
  async rebuild(eventStore: { getAll: () => AsyncIterable<AccountEvent> }): Promise<void> {
    await this.readDb.query('TRUNCATE account_balances');
    for await (const event of eventStore.getAll()) {
      await this.handle(event);
    }
  }
}
typescript
// src/projections/audit-projection.ts
// Feeds the compliance team's audit dashboard

import { Pool } from 'pg';
import type { AccountEvent } from '../events/account-events';

export class AuditProjection {
  constructor(private readDb: Pool) {}

  async handle(event: AccountEvent): Promise<void> {
    // Flatten every event into a searchable audit log
    await this.readDb.query(
      `INSERT INTO audit_log (
        event_type, aggregate_id, data, correlation_id,
        causation_id, event_version, occurred_at
      ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
      [
        event.type,
        event.aggregateId,
        JSON.stringify(event.data),
        event.metadata.correlationId,
        event.metadata.causationId,
        event.metadata.version,
        event.metadata.timestamp,
      ]
    );
  }
}

Step 5: Wire Up Kafka for Event Publishing

After appending to the event store, publish to Kafka so projections and downstream services react.

typescript
// src/infrastructure/event-publisher.ts
// Publishes committed events to Kafka with exactly-once semantics

import { Kafka, Partitioners } from 'kafkajs';
import type { AccountEvent } from '../events/account-events';

const kafka = new Kafka({
  clientId: 'ledger-service',
  brokers: process.env.KAFKA_BROKERS?.split(',') ?? ['localhost:9092'],
});

const producer = kafka.producer({
  createPartitioner: Partitioners.DefaultPartitioner,
  idempotent: true,           // exactly-once delivery
  maxInFlightRequests: 1,     // ordering guarantee
  transactionalId: 'ledger-tx',
});

export async function publishEvents(events: AccountEvent[]): Promise<void> {
  const transaction = await producer.transaction();
  try {
    await transaction.send({
      topic: 'account-events',
      messages: events.map((e) => ({
        key: e.aggregateId,   // partition by account for ordering
        value: JSON.stringify(e),
        headers: {
          'event-type': e.type,
          'correlation-id': e.metadata.correlationId,
        },
      })),
    });
    await transaction.commit();
  } catch (err) {
    await transaction.abort();
    throw err;
  }
}

export async function startConsumer(
  groupId: string,
  handler: (event: AccountEvent) => Promise<void>
): Promise<void> {
  const consumer = kafka.consumer({ groupId });
  await consumer.connect();
  await consumer.subscribe({ topic: 'account-events', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value!.toString()) as AccountEvent;
      await handler(event);
    },
  });
}

Step 6: Command Handler Ties It All Together

typescript
// src/commands/transfer-funds.ts
// Orchestrates a transfer: load aggregate, validate, append, publish

import { EventStore, ConcurrencyError } from '../store/event-store';
import { Account } from '../aggregates/account';
import { publishEvents } from '../infrastructure/event-publisher';

const MAX_RETRIES = 3;  // optimistic concurrency retry limit

export async function transferFunds(
  store: EventStore,
  fromAccountId: string,
  toAccountId: string,
  amount: number,
  reference: string
): Promise<void> {
  let attempt = 0;

  while (attempt < MAX_RETRIES) {
    try {
      // Load both aggregates from their event histories
      const fromEvents = await store.getEvents(fromAccountId);
      const toEvents = await store.getEvents(toAccountId);
      const fromAccount = Account.fromEvents(fromEvents);
      const toAccount = Account.fromEvents(toEvents);

      // Generate events (business rules enforced inside aggregate)
      const withdrawal = fromAccount.withdraw(amount, reference, 'internal');
      const deposit = toAccount.deposit(amount, reference, 'internal');

      // Atomic append — both or neither
      await store.append([withdrawal, deposit]);

      // Publish for projections and downstream consumers
      await publishEvents([withdrawal, deposit]);

      return;  // success
    } catch (err) {
      if (err instanceof ConcurrencyError && attempt < MAX_RETRIES - 1) {
        attempt++;
        continue;  // retry with fresh state
      }
      throw err;
    }
  }
}

Step 7: Test the Event Chain Integrity

typescript
// src/__tests__/ledger.test.ts
// Verifies event chain, balance correctness, and concurrency safety

import { describe, it, expect } from 'vitest';
import { Account, InsufficientFundsError } from '../aggregates/account';
import type { AccountEvent } from '../events/account-events';

describe('Account aggregate', () => {
  const genesisHash = '0'.repeat(64);

  function createAccount(): AccountEvent {
    return {
      type: 'AccountCreated',
      aggregateId: '11111111-1111-1111-1111-111111111111',
      data: { ownerId: 'owner-1', currency: 'USD', accountType: 'checking' },
      metadata: {
        correlationId: 'c1', causationId: 'c1',
        timestamp: '2025-01-01T00:00:00Z', version: 1, previousHash: genesisHash,
      },
    };
  }

  function deposit(amount: number, version: number): AccountEvent {
    return {
      type: 'FundsDeposited',
      aggregateId: '11111111-1111-1111-1111-111111111111',
      data: { amount, reference: `dep-${version}`, source: 'ach' },
      metadata: {
        correlationId: 'c1', causationId: 'c1',
        timestamp: '2025-01-01T00:00:00Z', version, previousHash: genesisHash,
      },
    };
  }

  it('calculates correct balance from event history', () => {
    const events = [createAccount(), deposit(100_00, 2), deposit(50_00, 3)];
    const account = Account.fromEvents(events);
    expect(account.availableBalance).toBe(150_00);  // $150.00
  });

  it('rejects withdrawal exceeding available balance', () => {
    const events = [createAccount(), deposit(100_00, 2)];
    const account = Account.fromEvents(events);
    expect(() => account.withdraw(200_00, 'ref', 'ach'))
      .toThrow(InsufficientFundsError);
  });

  it('accounts for held funds in available balance', () => {
    const events: AccountEvent[] = [
      createAccount(),
      deposit(100_00, 2),
      {
        type: 'FundsHeld',
        aggregateId: '11111111-1111-1111-1111-111111111111',
        data: {
          amount: 30_00, holdId: 'hold-1',
          reason: 'pending verification',
          expiresAt: '2025-01-02T00:00:00Z',
        },
        metadata: {
          correlationId: 'c1', causationId: 'c1',
          timestamp: '2025-01-01T00:00:00Z', version: 3, previousHash: genesisHash,
        },
      },
    ];
    const account = Account.fromEvents(events);
    expect(account.availableBalance).toBe(70_00);  // $100 - $30 hold
  });
});

Results

After 6 weeks in production:

  • Complete audit trail from day one — auditors can trace any balance to its source events in seconds
  • Reconciliation time dropped from 4 hours/week to 12 minutes (automated replay + diff)
  • $23K/month saved in manual investigation costs
  • Zero data loss during migration — legacy balances imported as FundsDeposited events with original timestamps
  • Read latency unchanged at p95 3ms — projections serve reads without touching the event store
  • Write throughput handles 2,000 TPS with optimistic concurrency (3 retries cover 99.97% of conflicts)
  • Money transmitter audit passed on first attempt — hash chain proves no records were altered